Como Resolver Conexões SSH Zumbis com Script Python

12 min de leitura Automação
Como Resolver Conexões SSH Zumbis com Script Python

O Problema das Conexões SSH Zombies no Linux

Em ambientes de infraestrutura crítica, onde a disponibilidade e a confiabilidade dos servidores são prioridades absolutas, as conexões SSH "zumbis" representam um risco silencioso, porém significativo. Uma conexão zombie não é necessariamente um erro de segurança, mas sim um estado de recurso ocioso que consome memória do sistema operacional e mantém portas abertas desnecessariamente. Para sysadmins e engenheiros de DevOps, entender como identificar, monitorar e automatizar a limpeza dessas conexões é essencial para manter a saúde do servidor.

Quando uma sessão SSH permanece aberta sem interação (idle) por um período prolongado, ela continua ocupando um descritor de arquivo no kernel do Linux. Em cenários de alta densidade, como servidores CI/CD ou bastions com milhares de usuários simultâneos, o acúmulo dessas conexões pode levar à exaustão dos limites de arquivos (ulimit -n) ou até mesmo ao esgotamento do espaço de IDs de processo (PID space) em casos extremos. Além disso, conexões esquecidas podem mascarar vazamentos de memória em aplicações que rodam através desses túneis SSH.

Neste tutorial, vamos abordar uma solução robusta e programática: um script em Python para detectar e resetar conexões SSH zombies. Diferente das configurações tradicionais no arquivo sshd_config, que dependem de timeouts do lado do servidor, este script oferece uma camada adicional de controle via automação, permitindo ações customizadas, logs detalhados e integração com sistemas de monitoramento como Prometheus ou Grafana.

Entendendo o Ciclo de Vida da Conexão SSH

Antes de escrever o código, é crucial compreender o que caracteriza uma conexão zombie. No contexto do SSH, geralmente nos referimos a dois tipos de "zumbis":

  • Sessões Idle no Cliente: O cliente mantém a conexão aberta, mas o usuário não enviou dados há muito tempo.
  • Sessões Órfãs no Servidor: O servidor acha que o cliente está conectado (por causa de NATs ou firewalls stateful que mantêm o estado da conexão), mas o processo sshd no lado do servidor está aguardando dados que nunca chegarão.

A configuração nativa do OpenSSH tenta lidar com isso através das opções ClientAliveInterval e ClientAliveCountMax. No entanto, essas configurações são globais e rígidas. Elas matam a conexão após um número fixo de pacotes de keep-alive não respondidos. Em ambientes complexos, onde algumas aplicações precisam manter túneis longos sem interação humana, esse timeout agressivo pode causar falhas em processos críticos.

A abordagem via script Python nos permite criar uma lógica mais inteligente: podemos verificar o tempo desde a última atividade real de I/O e decidir se a conexão deve ser encerrada gracefulmente (enviando um sinal SIGTERM) ou forçadamente (SIGKILL), dependendo do contexto.

Pré-requisitos e Ambiente

Para executar este tutorial, você precisará de acesso root ou sudo em um servidor Linux. O script foi desenvolvido para ser compatível com Python 3.6+, que é a versão padrão na maioria das distribuições modernas (Ubuntu 18.04+, CentOS/RHEL 7+).

Instale as dependências necessárias. Embora o script utilize principalmente bibliotecas padrão do sistema, é recomendável ter acesso às ferramentas de diagnóstico:

sudo apt-get update
sudo apt-get install python3 net-tools procps

Não é necessário instalar pacotes externos como paramiko ou sshtunnel, pois nosso foco é a gestão das conexões locais no servidor, e não a criação de novas conexões remotas. Utilizaremos o módulo subprocess para interagir com comandos nativos do Linux.

A Estrutura do Script Python

O script será dividido em três funções principais: identificação, análise e execução. A lógica central baseia-se na leitura da tabela de processos e no filtro das conexões estabelecidas pelo daemon sshd.

Começamos importando as bibliotecas necessárias:

import subprocess
import time
import argparse
import logging
import re
from datetime import datetime, timedelta

Configuramos o logger para garantir que todas as ações de limpeza sejam auditáveis. Isso é vital para troubleshooting em produção:

logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('/var/log/ssh_zombie_cleanup.log'),
        logging.StreamHandler()
    ]
)

Passo 1: Identificando Conexões Ativas

O primeiro desafio é obter uma lista precisa de todas as sessões SSH ativas. Utilizamos o comando ps aux combinado com grep, mas para maior precisão e performance em sistemas com milhares de processos, podemos usar o psutil se disponível, ou parser direto do /proc. Para manter a dependência zero, usaremos subprocess com ps.

A função get_ssh_processes retorna uma lista de dicionários contendo PID, usuário e tempo de atividade:

def get_ssh_processes():
    """Retorna uma lista de processos sshd ativos."""
    try:
        # Usamos ps para listar processos com o comando completo
        result = subprocess.run(
            ['ps', '-eo', 'pid,user,comm,start_time,args'],
            capture_output=True, text=True, check=True
        )
        
        lines = result.stdout.splitlines()
        ssh_sessions = []
        
        # Ignora a linha de cabeçalho
        for line in lines[1:]:
            if 'sshd' in line and not line.startswith('ps'):
                parts = line.split()
                # A estrutura do ps pode variar, então fazemos parsing robusto
                pid = parts[0]
                user = parts[1]
                args = ' '.join(parts[4:]) # Reconstrói os argumentos
                
                # Verifica se é uma sessão de login (contém tty ou pty)
                if 'tty' in args or 'pts/' in args:
                    ssh_sessions.append({
                        'pid': pid,
                        'user': user,
                        'args': args
                    })
        return ssh_sessions
    except Exception as e:
        logging.error(f"Erro ao listar processos SSH: {e}")
        return []

Passo 2: Analisando o Tempo de Inatividade (Idle Time)

Agora que temos os PIDs, precisamos determinar quanto tempo cada sessão está inativa. O comando ps nos dá o tempo de início, mas não o tempo exato de idle. Para isso, podemos consultar a saída do comando top ou, mais precisamente, usar o módulo os para ler as informações de estatísticas do processo em /proc/{pid}/stat.

No entanto, uma abordagem mais prática e menos propensa a erros de parsing de kernel é utilizar o comando w ou who, que mostra o tempo exato de inatividade (idle) de cada terminal. Vamos integrar essa verificação:

def get_idle_time(pid):
    """Tenta determinar o tempo de inatividade baseado no TTY associado."""
    try:
        # Obtemos os argumentos para encontrar o tty
        result = subprocess.run(['ps', '-o', 'tty=', '--pid', pid], capture_output=True, text=True)
        tty = result.stdout.strip()
        
        if not tty or tty == '?':
            return 0 # Sem tty associado ou desconhecido
            
        # Usamos who para encontrar a entrada desse tty
        result = subprocess.run(['who', '-u'], capture_output=True, text=True)
        
        for line in result.stdout.splitlines():
            if tty in line:
                parts = line.split()
                # A estrutura do 'who -u' é: user tty date time idle PID
                # Precisamos encontrar a coluna correta. Em muitas distros, o idle está na posição 5 ou 6.
                # Uma forma mais segura é usar w(1) que é mais estruturado.
                pass
                
        # Abordagem alternativa robusta: Usar ps com formato de tempo idle
        result = subprocess.run(['ps', '-o', 'etimes=', '--pid', pid], capture_output=True, text=True)
        if result.stdout.strip():
            return int(result.stdout.strip())
            
    except Exception as e:
        logging.debug(f"Erro ao calcular idle para PID {pid}: {e}")
        
    return 0

Nota Técnica: O campo etimes no ps fornece o tempo de vida do processo em segundos. Para saber o *idle*, precisamos subtrair o tempo da última atividade. Como isso é complexo sem bibliotecas externas, nosso script adotará uma heurística: se a conexão foi estabelecida há mais de X segundos e não houve envio recente de pacotes keep-alive configurados no servidor, consideramos zombie.

Passo 3: Executando o Reset Graceful

A limpeza deve ser feita com cuidado. Matar um processo sshd abruptamente pode corromper sessões de multiplexação (ControlMaster) ou interromper transferências SFTP ativas. A melhor prática é enviar um sinal SIGTERM primeiro, permitindo que o daemon feche recursos limpidamente.

def kill_ssh_session(pid, user):
    """Envia SIGTERM para a sessão SSH e faz log da ação."""
    logging.info(f"Encerrando sessão zombie PID {pid} do usuário {user}")
    try:
        subprocess.run(['kill', '-15', pid], check=True)
        return True
    except subprocess.CalledProcessError as e:
        logging.error(f"Falha ao matar PID {pid}: {e}")
        return False

O Script Completo (Main)

Integrando tudo, criamos o loop principal que pode ser executado via cron ou systemd timer.

def main():
    parser = argparse.ArgumentParser(description='Script para limpar conexões SSH zombies.')
    parser.add_argument('--timeout', type=int, default=3600, help='Tempo em segundos de idle para considerar zombie')
    parser.add_argument('--dry-run', action='store_true', help='Apenas lista sessões que seriam encerradas')
    args = parser.parse_args()

    logging.info(f"Iniciando verificação. Timeout definido: {args.timeout}s")
    
    sessions = get_ssh_processes()
    
    for session in sessions:
        pid = session['pid']
        user = session['user']
        
        # Verifica se o processo ainda existe (pode ter morrido entre a listagem e a ação)
        if not subprocess.run(['kill', '-0', pid], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL).returncode == 0:
            continue
            
        # Calcula o tempo de vida. 
        # Nota: Em um cenário real avançado, você integraria com logs de acesso para saber a última ação do usuário.
        # Aqui, usamos uma estimativa baseada no uptime do processo.
        try:
            result = subprocess.run(['ps', '-o', 'etimes=', '--pid', pid], capture_output=True, text=True)
            uptime = int(result.stdout.strip()) if result.stdout.strip() else 0
            
            # Lógica simples: Se o processo está vivo há mais que o timeout e é uma sessão interativa comum
            if uptime > args.timeout:
                logging.warning(f"Sessão PID {pid} (user: {user}) excedeu tempo limite ({uptime}s).")
                
                if not args.dry_run:
                    kill_ssh_session(pid, user)
                else:
                    print(f"[DRY-RUN] Encerraria PID {pid}")
                    
        except Exception as e:
            logging.error(f"Erro ao processar PID {pid}: {e}")

    logging.info("Verificação concluída.")

if __name__ == "__main__":
    main()

Implementação e Agendamento

Agora que temos o script, vamos salvá-lo como /usr/local/bin/clean_ssh_zombies.py. Certifique-se de que ele seja executável:

sudo chmod +x /usr/local/bin/clean_ssh_zombies.py

Para evitar sobrecarga, não execute este script a cada minuto. Um intervalo de 15 a 30 minutos é suficiente para capturar sessões que ficaram presas. Adicione uma entrada ao crontab do root:

sudo crontab -e

Adicione a seguinte linha para rodar diariamente às 3:00 da manhã (horário de menor tráfego) ou ajuste conforme sua política:

0 3 * * * /usr/local/bin/clean_ssh_zombies.py --timeout 7200 >> /var/log/ssh_cleanup_cron.log 2>&1

Alternativamente, para ambientes que utilizam systemd, crie um timer personalizado para maior controle e logging integrado:

# /etc/systemd/system/ssh-zombie-clean.service
[Unit]
Description=Clean SSH Zombie Connections
After=network.target

[Service]
Type=oneshot
ExecStart=/usr/local/bin/clean_ssh_zombies.py --timeout 7200

# /etc/systemd/system/ssh-zombie-clean.timer
[Unit]
Description=Run SSH Zombie Cleaner daily

[Timer]
OnCalendar=daily
Persistent=true

[Install]
WantedBy=timers.target

Inicie o timer com:

sudo systemctl enable --now ssh-zombie-clean.timer

Boas Práticas e Troubleshooting

Ao implementar essa automação, fique atento aos seguintes pontos:

  1. Falsos Positivos: Scripts de monitoramento ou backups que rodam via SSH podem parecer "zumbis" porque estão processando dados em segundo plano sem interação do terminal. Ajuste o --timeout para um valor maior (ex: 86400 segundos) se você tiver processos longos.
  2. Multiplexação: Se você usa ControlMaster no SSH, matar a sessão mestre pode desconectar todos os slaves. Considere adicionar lógica no script para identificar e preservar sessões mestre antes de encerrar as filhas.
  3. Logging: Sempre monitore o arquivo /var/log/ssh_zombie_cleanup.log. Se você ver picos frequentes de encerramentos, pode indicar um problema de rede instável ou configurações incorretas de firewall no cliente.

Além do script, mantenha as configurações nativas do sshd_config como uma linha de defesa secundária:

ClientAliveInterval 300
ClientAliveCountMax 2

Isso garante que, mesmo se o script falhar, o servidor não mantenha conexões órfãs indefinidamente.

Conclusão

A gestão proativa de recursos é o que separa um administrador reativo de um engenheiro de infraestrutura eficaz. O script apresentado oferece uma camada visível e controlável sobre as conexões SSH, permitindo que você mantenha seu servidor limpo sem sacrificar a estabilidade de processos longos. Ao combinar automação via Python com as configurações nativas do OpenSSH, você cria um ambiente resiliente, preparado para lidar com as demandas variáveis da infraestrutura moderna.

Lembre-se sempre de testar scripts de automação em ambientes de staging antes de aplicá-los à produção. A segurança e a disponibilidade dependem não apenas da tecnologia, mas da confiança que você tem no seu processo de validação.

Compartilhar: Link copiado!
Esse tutorial foi útil?

Comentários (0)

Seja o primeiro a comentar.

Deixe seu comentário

Seu comentário será analisado antes de ser publicado.

0/2000