Source code for src.notification.sender_serverchan
from src.notification.notification_base import NotificationBase
from src.core.checker import Recipient
from typing import Dict
import httpx
import logging
logger = logging.getLogger(__name__)
[docs]
class ServerChanSender(NotificationBase):
[docs]
def __init__(self, sckey: str):
self.sckey = sckey
[docs]
def render_content(self, name: str, template_file: str, extra_info: Dict) -> str:
# 只渲染纯文本内容
lines = [f"亲爱的{name}:"]
if extra_info.get("days_until", 0) == 0:
if extra_info.get("solar_match") and extra_info.get("lunar_match"):
lines.append("今天是您的阳历和农历生日,祝您生日快乐!🎉")
elif extra_info.get("solar_match"):
lines.append("今天是您的阳历生日,祝您生日快乐!🎉")
else:
lines.append("今天是您的农历生日,祝您生日快乐!🎉")
else:
if extra_info.get("solar_match") and extra_info.get("lunar_match"):
lines.append(f"{extra_info['days_until']}天后是您的阳历和农历生日!")
elif extra_info.get("solar_match"):
lines.append(f"{extra_info['days_until']}天后是您的阳历生日!")
else:
lines.append(f"{extra_info['days_until']}天后是您的农历生日!")
# 追加命理和节日信息
lines.append(f"生肖:{extra_info.get('zodiac', '')}")
lines.append(f"星座:{extra_info.get('constellation', '')}")
if extra_info.get("solar_term"):
lines.append(f"节气:{extra_info['solar_term']}")
if extra_info.get("lunar_festival"):
lines.append(f"农历节日:{extra_info['lunar_festival']}")
if extra_info.get("solar_festival"):
lines.append(f"阳历节日:{extra_info['solar_festival']}")
return "\n".join(lines)
[docs]
async def send(self, recipient: Recipient, content: str, days_until: int, age: int):
# Server酱推送API
url = f"https://sctapi.ftqq.com/{self.sckey}.send"
title = f"生日提醒- {recipient.name} - {age}岁 - {days_until}天后"
data = {"title": title, "desp": content}
async with httpx.AsyncClient() as client:
resp = await client.post(url, data=data)
if resp.status_code == 200 and resp.json().get("code") == 0:
logger.info(f"Server酱推送成功: {recipient.name}")
else:
logger.error(f"Server酱推送失败: {recipient.name}, 响应: {resp.text}")
raise Exception(f"Server酱推送失败: {resp.text}")