Files
emailsystemv2/app/utils.py

309 lines
12 KiB
Python
Raw Normal View History

2025-02-26 18:29:10 +08:00
import smtplib
import os
import json
import logging
2025-02-26 18:48:31 +08:00
import sys
2025-02-26 18:29:10 +08:00
from email.parser import BytesParser
from email.policy import default
from datetime import datetime
import redis
import smtpd
import asyncore
import base64
from .config import Config
2025-02-26 18:48:31 +08:00
# 配置日志输出到控制台
2025-02-26 18:29:10 +08:00
logging.basicConfig(
2025-02-26 18:48:31 +08:00
level=logging.DEBUG, # 改为 DEBUG 级别
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.StreamHandler(sys.stdout)
]
2025-02-26 18:29:10 +08:00
)
logger = logging.getLogger('smtp_server')
# 初始化 Redis 客户端
redis_client = redis.from_url(Config.REDIS_URL)
class CustomSMTPServer(smtpd.SMTPServer):
2025-02-26 18:48:31 +08:00
def __init__(self, localaddr, remoteaddr):
logger.info(f"Initializing SMTP server on {localaddr}")
super().__init__(localaddr, remoteaddr)
2025-02-26 18:50:40 +08:00
def process_message(self, peer, mailfrom, rcpttos, data, **kwargs):
2025-02-26 18:29:10 +08:00
try:
2025-02-26 18:48:31 +08:00
logger.debug(f"Connection from peer: {peer}")
logger.debug(f"Mail from: {mailfrom}")
logger.debug(f"Recipients: {rcpttos}")
logger.debug(f"Raw data length: {len(data)} bytes")
logger.debug(f"Additional kwargs: {kwargs}")
2025-02-26 18:48:31 +08:00
2025-02-26 18:29:10 +08:00
# 记录接收到的邮件基本信息
2025-02-26 18:38:10 +08:00
logger.info(f"Received mail from {mailfrom} to {rcpttos}")
2025-02-26 18:29:10 +08:00
# 从 Redis 获取允许的域名
allowed_domains = get_allowed_domains()
2025-02-26 18:48:31 +08:00
valid_recipients = []
2025-02-26 18:29:10 +08:00
for rcpt in rcpttos:
is_valid = any(rcpt.endswith(f'@{domain}') for domain in allowed_domains)
2025-02-27 09:27:44 +08:00
if not is_valid:
2025-02-26 18:38:10 +08:00
logger.warning(f"Rejected mail to {rcpt}: invalid domain")
2025-02-26 18:48:31 +08:00
else:
valid_recipients.append(rcpt)
if not valid_recipients:
logger.error("No valid recipients found")
return
2025-02-26 18:29:10 +08:00
# 解析邮件
2025-02-26 18:48:31 +08:00
logger.debug("Parsing email data...")
2025-02-26 18:29:10 +08:00
email = BytesParser(policy=default).parsebytes(data)
# 获取邮件正文
body = self._get_email_body(email)
2025-02-26 18:48:31 +08:00
logger.debug(f"Email body length: {len(body) if body else 0}")
2025-02-26 18:29:10 +08:00
# 处理附件
attachments = self._process_attachments(email)
2025-02-26 18:48:31 +08:00
logger.debug(f"Found {len(attachments)} attachments")
2025-02-26 18:29:10 +08:00
# 构建邮件数据
timestamp = datetime.now().isoformat()
2025-02-26 18:38:10 +08:00
message_id = email.get('Message-ID', f"<{timestamp}@nosqli.com>")
2025-02-26 18:29:10 +08:00
email_data = {
'message_id': message_id,
'subject': email.get('subject', ''),
'sender': mailfrom,
2025-02-26 18:48:31 +08:00
'recipients': json.dumps(valid_recipients),
2025-02-26 18:29:10 +08:00
'body': body,
'timestamp': timestamp,
'attachments': json.dumps(attachments),
'headers': json.dumps(dict(email.items())),
'peer': json.dumps(peer)
}
# 存储邮件
self._store_email(email_data)
2025-02-26 18:38:10 +08:00
logger.info(f"Successfully processed mail: {message_id}")
2025-02-26 18:29:10 +08:00
except Exception as e:
2025-02-26 18:38:10 +08:00
logger.error(f"Error processing email: {str(e)}", exc_info=True)
2025-02-26 18:29:10 +08:00
raise
def _get_email_body(self, email):
"""提取邮件正文"""
try:
2025-02-26 18:48:31 +08:00
logger.debug("Extracting email body...")
2025-02-26 18:29:10 +08:00
if email.is_multipart():
for part in email.walk():
if part.get_content_type() == "text/plain":
2025-02-26 18:48:31 +08:00
content = part.get_payload(decode=True).decode()
logger.debug(f"Found text/plain content: {len(content)} chars")
return content
2025-02-26 18:29:10 +08:00
else:
2025-02-26 18:48:31 +08:00
content = email.get_payload(decode=True).decode()
logger.debug(f"Found single part content: {len(content)} chars")
return content
logger.warning("No text content found in email")
2025-02-26 18:29:10 +08:00
return ""
except Exception as e:
2025-02-26 18:38:10 +08:00
logger.error(f"Error extracting email body: {str(e)}")
2025-02-26 18:29:10 +08:00
return ""
def _process_attachments(self, email):
"""处理邮件附件"""
attachments = []
try:
2025-02-26 18:48:31 +08:00
logger.debug("Processing attachments...")
2025-02-26 18:29:10 +08:00
if email.is_multipart():
for part in email.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
filename = part.get_filename()
if filename:
2025-02-26 18:48:31 +08:00
logger.debug(f"Processing attachment: {filename}")
2025-02-26 18:29:10 +08:00
attachment_data = part.get_payload(decode=True)
attachments.append({
'filename': filename,
'content': base64.b64encode(attachment_data).decode(),
'content_type': part.get_content_type(),
'size': len(attachment_data)
})
2025-02-26 18:48:31 +08:00
logger.debug(f"Attachment processed: {filename} ({len(attachment_data)} bytes)")
2025-02-26 18:29:10 +08:00
except Exception as e:
2025-02-26 18:38:10 +08:00
logger.error(f"Error processing attachments: {str(e)}")
2025-02-26 18:29:10 +08:00
return attachments
def _store_email(self, email_data):
"""存储邮件到 Redis"""
try:
2025-02-26 18:48:31 +08:00
logger.debug("Storing email in Redis...")
2025-02-26 18:29:10 +08:00
# 使用 message_id 作为主键
2025-02-26 18:38:10 +08:00
email_key = f"email:{email_data['message_id']}"
2025-02-26 18:29:10 +08:00
redis_client.hmset(email_key, email_data)
2025-02-26 18:48:31 +08:00
logger.debug(f"Stored email with key: {email_key}")
2025-02-26 18:29:10 +08:00
# 为每个收件人创建索引
recipients = json.loads(email_data['recipients'])
for recipient in recipients:
2025-02-26 18:38:10 +08:00
recipient_key = f"recipient:{recipient}"
2025-02-26 18:29:10 +08:00
redis_client.lpush(recipient_key, email_key)
2025-02-26 18:48:31 +08:00
logger.debug(f"Created recipient index: {recipient_key}")
2025-02-26 18:29:10 +08:00
# 创建时间索引
2025-02-26 18:38:10 +08:00
time_key = f"time:{email_data['timestamp']}"
2025-02-26 18:29:10 +08:00
redis_client.set(time_key, email_key)
2025-02-26 18:48:31 +08:00
logger.debug(f"Created time index: {time_key}")
2025-02-26 18:29:10 +08:00
2025-02-26 19:05:53 +08:00
# 设置过期时间可选这里设置为10分钟
redis_client.expire(email_key, 10 * 60)
logger.debug("Set expiration time: 10 minutes")
2025-02-26 18:29:10 +08:00
except Exception as e:
2025-02-26 18:38:10 +08:00
logger.error(f"Error storing email: {str(e)}")
2025-02-26 18:29:10 +08:00
raise
def start_smtp_server(host='0.0.0.0', port=25):
"""启动 SMTP 服务器"""
try:
2025-02-26 18:38:10 +08:00
logger.info(f"Starting SMTP server on {host}:{port}")
2025-02-26 18:29:10 +08:00
server = CustomSMTPServer((host, port), None)
2025-02-26 18:48:31 +08:00
logger.info("SMTP server initialized, entering main loop...")
2025-02-26 18:29:10 +08:00
asyncore.loop()
except Exception as e:
2025-02-26 18:38:10 +08:00
logger.error(f"Error starting SMTP server: {str(e)}")
2025-02-26 18:29:10 +08:00
raise
def get_emails_by_recipient(recipient, limit=10):
"""获取指定收件人的最新邮件"""
try:
recipient_key = f'recipient:{recipient}'
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
emails = []
for key in email_keys:
email_data = redis_client.hgetall(key.decode())
if email_data:
# 转换数据为字符串
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
emails.append(email_data)
return emails
except Exception as e:
print(f'Error fetching emails: {e}')
return []
def get_attachment(email_key, attachment_index):
"""获取指定邮件的附件"""
try:
email_data = redis_client.hgetall(email_key)
if email_data:
attachments = json.loads(email_data[b'attachments'].decode())
if 0 <= attachment_index < len(attachments):
return attachments[attachment_index]
return None
except Exception as e:
print(f'Error fetching attachment: {e}')
2025-02-26 19:05:53 +08:00
return None
def get_latest_emails(recipient, limit=10):
"""获取指定收件人的最新邮件"""
try:
recipient_key = f'recipient:{recipient}'
email_keys = redis_client.lrange(recipient_key, 0, limit - 1)
emails = []
for key in email_keys:
email_data = redis_client.hgetall(key.decode())
if email_data:
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
emails.append(email_data)
return emails
except Exception as e:
logger.error(f'Error fetching emails: {e}')
2025-02-26 19:14:40 +08:00
return []
def get_latest_email_with_code(recipient):
"""获取指定收件人的最新邮件并提取验证码"""
try:
recipient_key = f'recipient:{recipient}'
email_key = redis_client.lindex(recipient_key, 0) # 获取最新邮件的键
if email_key:
email_data = redis_client.hgetall(email_key.decode())
if email_data:
email_data = {k.decode(): v.decode() for k, v in email_data.items()}
body = email_data.get('body', '')
# 假设验证码是以某种格式存在于邮件正文中,例如 "验证码: 123456"
code = extract_code_from_body(body)
email_data['code'] = code # 将验证码添加到返回数据中
return email_data
return None
except Exception as e:
logger.error(f'Error fetching latest email with code: {e}')
return None
def extract_code_from_body(body):
"""从邮件正文中提取验证码"""
import re
match = re.search(r'\b(\d{6})\b', body)
2025-02-27 10:08:58 +08:00
return match.group(1) if match else None
def add_allowed_domain(domain):
2025-03-01 11:11:44 +08:00
"""添加允许的域名并记录创建时间"""
2025-02-27 10:08:58 +08:00
try:
2025-03-01 11:11:44 +08:00
timestamp = datetime.now().isoformat()
redis_client.rpush('allowed_domains_list', domain) # 添加到列表
redis_client.hset(f'domain:{domain}', 'created_at', timestamp) # 记录创建时间
redis_client.hset(f'domain_time:{domain}', 'timestamp', timestamp) # 记录时间
logger.info(f'Added allowed domain: {domain} with timestamp: {timestamp}')
2025-02-27 10:08:58 +08:00
except Exception as e:
logger.error(f'Error adding allowed domain: {e}')
def remove_allowed_domain(domain):
"""删除允许的域名"""
try:
redis_client.srem('allowed_domains', domain)
2025-03-01 11:11:44 +08:00
redis_client.delete(f'domain_time:{domain}') # 删除时间记录
2025-02-27 10:08:58 +08:00
logger.info(f'Removed allowed domain: {domain}')
except Exception as e:
logger.error(f'Error removing allowed domain: {e}')
def get_allowed_domains():
2025-03-01 11:11:44 +08:00
"""获取当前允许的域名"""
2025-02-27 10:08:58 +08:00
try:
2025-03-01 11:11:44 +08:00
domains = redis_client.lrange('allowed_domains_list', 0, -1)
domain_list = []
for domain in domains:
domain_list.append(domain.decode())
return domain_list
2025-02-27 10:08:58 +08:00
except Exception as e:
logger.error(f'Error fetching allowed domains: {e}')
2025-03-01 11:11:44 +08:00
return []
def get_allowed_domains_for_time():
"""获取当前允许的域名及其创建时间,按时间排序"""
try:
domains = redis_client.lrange('allowed_domains_list', 0, -1)
domain_info = {}
for domain in domains:
domain = domain.decode()
created_at = redis_client.hget(f'domain_time:{domain}', 'timestamp')
domain_info[domain] = created_at.decode() if created_at else None
return domain_info
except Exception as e:
logger.error(f'Error fetching allowed domains: {e}')
return {}