Skip to content

SJMC 直播间状态

1. 插件简介

插件名称 父类 触发关键词 触发权限 内容
SjmcLiveStatus StandardPlugin '-mclive' / '-sjmclive' None 获取交大mc社B站直播状态,监测sjmc直播间信息,并在特定群广播开播消息
FduMcLiveStatus StandardPlugin '-fdmclive' None 获取复旦基岩社B站直播状态,监测基岩社直播间信息,并在特定群广播开播消息

2. 示范样例

MC社B站未开播
111> -sjmclive
bot> 当前时段未开播哦

MC社B站开播
bot> 检测到MC社B站开播,SJMC社直播地址: https://live.bilibili.com/25567444
bot> 【sjmc直播间信息图片】
333> -sjmc
bot> 【sjmc直播间信息图片】

sjmc直播间信息图片:

3. 代码分析

代码位于 plugins/sjmcLive.py

class SjmcLiveStatus(StandardPlugin):
    monitorSemaphore = Semaphore()
    @staticmethod
    def dumpSjmcStatus(status: bool):
        exactPath = 'data/sjmcLive.json'
        with open(exactPath, 'w') as f:
            f.write('1' if status else '0')
    @staticmethod
    def loadSjmcStatus()->bool:
        exactPath = 'data/sjmcLive.json'
        with open(exactPath, 'r') as f:
            return f.read().startswith('1')
    def __init__(self) -> None:
        self.liveId = 25567444
        self.liveRoom = LiveRoom(self.liveId)
        self.timer = Timer(5, self.sjmcMonitor)
        if SjmcLiveStatus.monitorSemaphore.acquire(blocking=False):
            self.timer.start()
        self.exactPath = 'data/sjmcLive.json'
        self.prevStatus = False # false: 未开播, true: 开播
        self.sjmcQqGroup = 712514518
        if not os.path.isfile(self.exactPath):
            SjmcLiveStatus.dumpSjmcStatus(False)
        else:
            self.prevStatus = SjmcLiveStatus.loadSjmcStatus()
    def sjmcMonitor(self):
        # print('mctick')
        self.timer.cancel()
        self.timer = Timer(60,self.sjmcMonitor)
        self.timer.start()
        prevStatus = SjmcLiveStatus.loadSjmcStatus()
        roomInfo = asyncio.run(self.liveRoom.get_room_info())['room_info']
        currentStatus = roomInfo['live_status'] == 1
        if currentStatus != prevStatus:
            SjmcLiveStatus.dumpSjmcStatus(currentStatus)
            if currentStatus and self.sjmcQqGroup in getPluginEnabledGroups('sjmc'):
                send(self.sjmcQqGroup, '检测到MC社B站开播,SJMC社直播地址: https://live.bilibili.com/%d'%self.liveId)
                savePath = os.path.join(ROOT_PATH, SAVE_TMP_PATH, 'sjmcLive.png')
                genLivePic(roomInfo, 'sjmc直播间状态', savePath)
                send(self.sjmcQqGroup, f'[CQ:image,file=files://{savePath},id=40000]')

    def judgeTrigger(self, msg: str, data: Any) -> bool:
        return msg in ['-mclive', '-sjmclive']
    def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
        target = data['group_id'] if data['message_type']=='group' else data['user_id']
        try:
            roomInfo = asyncio.run(self.liveRoom.get_room_info())['room_info']
        except LiveException as e:
            warning("sjmc bilibili api exception: {}".format(e))
            return
        except ApiException as e:
            warning('bilibili api exception: {}'.format(e))
            return 
        except BaseException as e:
            warning('base exception in sjmclive: {}'.format(e))
            return
        if roomInfo['live_status'] == 1:
            savePath = os.path.join(ROOT_PATH, SAVE_TMP_PATH, 'sjmcLive-%d.png'%target)
            genLivePic(roomInfo, 'sjmc直播间状态', savePath)
            send(target, f'[CQ:image,file=files://{savePath},id=40000]', data['message_type'])
        else:
            send(target, '当前时段未开播哦', data['message_type'])
        return "OK"
    def getPluginInfo(self) -> dict:
        return {
            'name': 'sjmclive',
            'description': '交大MC社B站直播间状态',
            'commandDescription': '-mclive/-sjmclive',
            'usePlace': ['group', 'private', ],
            'showInHelp': True,
            'pluginConfigTableNames': [],
            'version': '1.0.3',
            'author': 'Unicorn',
        }
def genLivePic(roomInfo, title, savePath)->str:
    img = ResponseImage(
        theme = 'unicorn',
        title = title, 
        titleColor = PALETTE_SJTU_GREEN, 
        primaryColor = PALETTE_SJTU_RED, 
        footer = datetime.now().strftime('当前时间 %Y-%m-%d %H:%M:%S'),
        layout = 'normal'
    )
    img.addCard(
        ResponseImage.NoticeCard(
            title = roomInfo['title'],
            subtitle = datetime.fromtimestamp(roomInfo['live_start_time']).strftime(
                                                "开播时间  %Y-%m-%d %H:%M:%S"),
            keyword = '直播分区: '+roomInfo['area_name'],
            body = roomInfo['description'],
            illustration = roomInfo['keyframe'],
        )
    )
    img.generateImage(savePath)
    return savePath

为了确保实例化多个 SjmcLiveStatus 插件时只有一个监测广播mc社直播间状态,因此作者选用threading.Semaphore来保护监测线程入口