Skip to content
KyKyIIIKuH
KyKyIIIKuH

Двухфакторная авторизация через Telegram для SSH

KyKyIIIKuH, 31.08.202431.08.2024
pip3 install python-telegram-bot aiofiles requests --break-system-packages

Создадим сам python-скрипт, которые реализуют логику двухфакторной аутентификации.
Не забудьте изменить TOKEN и CHAT_ID на ваши.

cat > telegram_auth.py <<EOF
import telegram
from telegram import InlineKeyboardButton, InlineKeyboardMarkup
import sys
import os
import asyncio
from datetime import datetime
import requests  # Для получения информации о городе и провайдере
import subprocess  # Для выполнения системных команд

# Конфигурация
TOKEN = ''  # Токен вашего Telegram-бота
CHAT_ID = ''  # ID чата в Telegram, куда будут отправляться сообщения
IP_INFO_URL = 'http://ipinfo.io/{}/json'  # URL для получения информации о IP-адресе (город, провайдер и т.д.)

# Создаем объект бота с использованием токена
bot = telegram.Bot(token=TOKEN)

# Словарь для хранения запросов на подтверждение
requests = {}

def get_local_ip():
    """
    Функция для получения локального IP-адреса машины, на которой выполняется скрипт.
    Использует команду 'hostname -I' для получения IP-адресов и возвращает первый из них.
    """
    try:
        result = subprocess.run(['hostname', '-I'], capture_output=True, text=True)
        return result.stdout.strip().split()[0]  # Возвращаем первый IP из списка
    except Exception:
        return 'Неизвестен'  # Возвращаем 'Неизвестен' в случае ошибки

def get_hostname():
    """
    Функция для получения имени хоста машины, на которой выполняется скрипт.
    Использует команду 'hostname' для получения имени хоста.
    """
    try:
        result = subprocess.run(['hostname'], capture_output=True, text=True)
        return result.stdout.strip()  # Возвращаем имя хоста
    except Exception:
        return 'Неизвестен'  # Возвращаем 'Неизвестен' в случае ошибки

async def send_telegram_message(username, remote_ip, request_id):
    """
    Асинхронная функция для отправки сообщения в Telegram с информацией о попытке входа.
    Сообщение включает время входа, IP-адрес, информацию о городе и провайдере,
    локальный IP и имя хоста.
    """
    # Получаем информацию о IP
    ip_info = {}
    try:
        response = requests.get(IP_INFO_URL.format(remote_ip))
        ip_info = response.json()  # Преобразуем ответ в формат JSON
    except Exception:
        ip_info = {}  # Если возникла ошибка, оставляем словарь пустым

    # Извлекаем информацию из ответа
    city = ip_info.get('city', 'Неизвестно')
    provider = ip_info.get('org', 'Неизвестно')
    login_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')  # Текущее время в нужном формате
    local_ip = get_local_ip()  # Получаем локальный IP
    hostname = get_hostname()  # Получаем имя хоста

    # Формируем текст сообщения
    message = (f"? Login Time: {login_time}\n"
               f"? Remote IP: {remote_ip}\n"
               f"? System IP: {local_ip}\n"
               f"?️ City: {city}\n"
               f"? Provider: {provider}\n"
               f"? Hostname: {hostname}\n"
               f"? Username: {username}")
    
    # Создаем кнопки для ответа (разрешить или запретить вход)
    reply_markup = InlineKeyboardMarkup([
        [InlineKeyboardButton("Разрешить", callback_data=f"allow_{request_id}"),
         InlineKeyboardButton("Запретить", callback_data=f"deny_{request_id}")]
    ])
    try:
        await bot.send_message(chat_id=CHAT_ID, text=message, reply_markup=reply_markup)
    except Exception:
        pass  # Игнорируем ошибки при отправке сообщения

async def main():
    """
    Основная асинхронная функция, которая запускает процесс обработки входящих запросов.
    """
    global requests
    username = os.getenv('PAM_USER')  # Получаем имя пользователя из переменной окружения PAM_USER
    remote_ip = os.getenv('PAM_RHOST')  # Получаем IP-адрес удаленного хоста из переменной окружения PAM_RHOST

    if not username or not remote_ip:
        sys.exit(1)  # Если данные отсутствуют, завершаем выполнение с кодом 1

    # Создаем уникальный идентификатор запроса на основе текущего времени
    request_id = str(int(datetime.now().timestamp()))
    # Сохраняем информацию о запросе в словаре
    requests[request_id] = {'username': username, 'remote_ip': remote_ip, 'timestamp': datetime.now().isoformat()}

    # Отправляем сообщение в Telegram с запросом на подтверждение входа
    await send_telegram_message(username, remote_ip, request_id)

    update_id = None  # ID последнего обновления для бота
    start_time = datetime.now()  # Время начала обработки запросов

    while True:
        try:
            # Проверяем, прошло ли более 60 секунд с начала обработки запросов
            if (datetime.now() - start_time).total_seconds() > 60:
                sys.exit(1)  # Завершаем выполнение, если прошло больше 60 секунд

            # Получаем обновления от бота
            updates = await bot.get_updates(offset=update_id, timeout=10)
            for update in updates:
                update_id = update.update_id + 1  # Обновляем ID последнего обновления
                if update.callback_query:  # Проверяем, есть ли обратный вызов с кнопки
                    callback_data = update.callback_query.data  # Извлекаем данные из обратного вызова
                    if callback_data.startswith('allow_') or callback_data.startswith('deny_'):
                        req_id = callback_data.split('_')[1]  # Извлекаем ID запроса из данных обратного вызова
                        if req_id in requests:
                            if callback_data.startswith('allow_'):
                                del requests[req_id]  # Удаляем обработанный запрос из словаря
                                sys.exit(0)  # Разрешаем вход
                            elif callback_data.startswith('deny_'):
                                del requests[req_id]  # Удаляем обработанный запрос из словаря
                                sys.exit(1)  # Запрещаем вход
        except Exception:
            pass  # Игнорируем ошибки в процессе обработки
        await asyncio.sleep(1)  # Ожидаем перед следующим запросом

if __name__ == "__main__":
    asyncio.run(main())  # Запускаем основную асинхронную функцию
EOF

Добавляем конфигурацию PAM для аутентификации через Telegram (не забываем изменить путь к файлу telegram_auth.py):

cat > /etc/pam.d/telegram-auth <<EOF
auth requisite pam_exec.so stdout /usr/bin/python3 /root/telegram_auth.py
EOF

Включаем аутентификацию через Telegram в SSH:

sed -i '/^auth\s.*pam_exec.so/d' /etc/pam.d/sshd && \
echo "auth include telegram-auth" >> /etc/pam.d/sshd

Перезапускаем SSH для применения изменений:

systemctl restart ssh

Без рубрики

Навигация по записям

Previous post
Next post

Свежие записи

  • Установка и настройка виртуального роутера OpenWRT в Proxmox
  • pfSense Настройка LAN портов (bridge)
  • Двухфакторная авторизация через Telegram для SSH
  • pfSense и два провайдера
  • Генератор Штрих-Кодов code128

Свежие комментарии

Нет комментариев для просмотра.

Архивы

  • Февраль 2025
  • Январь 2025
  • Август 2024
  • Июль 2024

Рубрики

  • pfSense
  • ProxMox
  • Без рубрики
©2025 KyKyIIIKuH | WordPress Theme by SuperbThemes