Skip to content

SJTU 麦当劳

1. 插件简介

插件名称 父类 触发关键词 触发权限 内容
GetMddStatus StandardPlugin '-mdd' None 获取交大闵行麦当劳状态
MonitorMddStatus StandardPlugin
CronStandardPlugin
None None 广播交大闵行麦当劳状态改变信息

2. 插件作者简介

Teruteru: SJTU 电院 21级

麦当劳小程序逆向破解

3. 示范样例

麦当劳开门
bot> 📣交大闵行麦当劳 已▶️开放营业
111> -mdd
bot> 交大闵行麦当劳当前状态:
     ▶️营业中
     2022-11-21 07:31:00

麦当劳关门
bot> 📣交大闵行麦当劳 已⏸️暂停营业
111> -mdd
bot> 交大闵行麦当劳当前状态:
     ⏸️暂停营业
     2022-11-21 10:01:00

4. 代码分析

from utils.basicConfigs import ROOT_PATH
from utils.responseImage import *
from utils.basicEvent import send, warning
from typing import Union, Tuple, Any, List
from utils.standardPlugin import StandardPlugin, PluginGroupManager
from utils.basicEvent import getPluginEnabledGroups
from threading import Timer, Semaphore
from resources.api.mddApi import mddUrl, mddHeaders
from datetime import datetime
import os.path

class GetMddStatus(StandardPlugin):
    def judgeTrigger(self, msg: str, data: Any) -> bool:
        return msg == '-mdd'
    def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
        target = data['group_id'] if data['message_type']=='group' else data['user_id']
        req = getMddStatus()
        if req == None:
            send(target, '获取交大闵行麦当劳状态失败!', data['message_type'])
        try:
            currentStatus = req["data"]["onlineBusinessStatus"]
        except KeyError as e:
            warning("mdd api failed: {}".format(e))
            send(target, '获取交大闵行麦当劳状态失败!', data['message_type'])
            return
        if currentStatus :
            send(target, '交大闵行麦当劳当前状态:\n▶️营业中\n\n%s'%datetime.now().strftime("%Y-%m-%d %H:%M:%S"), data['message_type'])
        else:
            send(target, '交大闵行麦当劳当前状态:\n⏸️暂停营业\n\n%s'%datetime.now().strftime("%Y-%m-%d %H:%M:%S"), data['message_type'])
        return "OK"

    def getPluginInfo(self) -> dict:
        return {
            'name': 'GetMddStatus',
            'description': '麦当劳查询',
            'commandDescription': '-mdd',
            'usePlace': ['group', 'private', ],
            'showInHelp': True,
            'pluginConfigTableNames': [],
            'version': '1.0.3',
            'author': 'Teruteru',
        }
class MonitorMddStatus(StandardPlugin):
    monitorSemaphore = Semaphore()
    @staticmethod
    def dumpMddStatus(status: bool):
        exactPath = 'data/mdd.json'
        with open(exactPath, 'w') as f:
            f.write('1' if status else '0')
    @staticmethod
    def loadMddStatus()->bool:
        exactPath = 'data/mdd.json'
        with open(exactPath, 'r') as f:
            return f.read().startswith('1')
    def __init__(self) -> None:
        self.timer = Timer(5, self.mddMonitor)
        if MonitorMddStatus.monitorSemaphore.acquire(blocking=False):
            self.timer.start()
        self.exactPath = 'data/mdd.json'
        self.prevStatus = False # false: 暂停营业, true: 营业
        if not os.path.isfile(self.exactPath):
            MonitorMddStatus.dumpMddStatus(False)
        else:
            self.prevStatus = MonitorMddStatus.loadMddStatus()
    def mddMonitor(self):
        self.timer.cancel()
        self.timer = Timer(60,self.mddMonitor)
        self.timer.start()
        prevStatus = MonitorMddStatus.loadMddStatus()
        req = getMddStatus()
        if req == None: return
        try:
            currentStatus = req["data"]["onlineBusinessStatus"]
        except KeyError as e:
            warning('mdd api failed: {}'.format(e))
            return
        if currentStatus != prevStatus:
            MonitorMddStatus.dumpMddStatus(currentStatus)
            if currentStatus :
                for group in getPluginEnabledGroups('mddmonitor'):
                    send(group, '📣交大闵行麦当劳 已▶️开放营业')
            else:
                for group in getPluginEnabledGroups('mddmonitor'):
                    send(group, '📣交大闵行麦当劳 已⏸️暂停营业')
    def judgeTrigger(self, msg: str, data: Any) -> bool:
        return False
    def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
        return "OK"
    def getPluginInfo(self) -> dict:
        return {
            'name': 'MonitorMddStatus',
            'description': '麦当劳状态监控',
            'commandDescription': 'None',
            'usePlace': ['group', ],
            'showInHelp': True,
            'pluginConfigTableNames': [],
            'version': '1.0.3',
            'author': 'Teruteru',
        }
def getMddStatus()->Union[None, dict]:
    req = requests.get(mddUrl, headers=mddHeaders)
    if req.status_code != requests.codes.ok:
        warning('mdd api failed!')
        return None
    else:
        return req.json()