Skip to content

B站订阅

1. 插件简介

插件名称 父类 触发关键词 触发权限 内容
BilibiliSubscribeHelper StandardPlugin 'B站订阅帮助' None 获取B站订阅帮助
BilibiliSubscribe StandardPlugin '订阅up' / '取消订阅' / '已订阅up' None 订阅B站up主等

2. 示范样例

111> B站订阅帮助
bot> 【订阅帮助内容】
111> 已订阅up
bot> 【回复上文】本群还没有订阅up哦~
222> 订阅up 316568752
bot> 订阅成功!
     name: 马督工
     uid: 316568752
111> 已订阅up
bot> 本群订阅的up有:

     name: 马督工
     uid: 316568752


<马逆更新视频后几分钟>
bot> 本群订阅UP主 【马督工】 更新视频啦!
     视频标题:[...]
     链接:[...]

3. 代码分析

代码位于 plugins/bilibiliSubscribe.py

def bvToUrl(bvid:str):
    return 'https://www.bilibili.com/video/' + bvid

def createBilibiliTable()->None:
    mydb = mysql.connector.connect(**sqlConfig)
    mycursor = mydb.cursor()
    mydb.autocommit = True
    mycursor.execute("""
    create table if not exists `BOT_DATA`.`bilibiliUp` (
       `uid` bigint unsigned not null,
       `uploadTime` timestamp not null,
       `bvid` char(20) not null,
       primary key (`uid`)
    )""")
    mycursor.execute("""
    create table if not exists `BOT_DATA`.`bilibiliSubscribe` (
        `group_id` bigint unsigned not null,
        `uid` bigint unsigned not null,
        primary key(`group_id`, `uid`)
    )""")


class BilibiliSubscribeHelper(StandardPlugin):
    def judgeTrigger(self, msg: str, data: Any) -> bool:
        return msg == "B站订阅帮助" and data['message_type']=='group'
    def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
        group_id = data['group_id']
        send(group_id,'订阅帮助: B站订阅帮助\n' 
                    '订阅up:    订阅up <uid>\n'
                    '取消订阅up: 取消订阅up<uid>\n'
                    '获取已订阅: 已订阅up\n'
                    '注意:  关闭本插件会自动取消订阅所有已订阅的up')
        return "OK"
    def getPluginInfo(self) -> dict:
        return {
            'name': 'BilibiliSubscribeHelper',
            'description': 'B站订阅帮助',
            'commandDescription': 'B站订阅帮助',
            'usePlace': ['group', ],
            'showInHelp': True,
            'pluginConfigTableNames': [],
            'version': '1.0.0',
            'author': 'Unicorn',
        }
class BilibiliUpSearcher(StandardPlugin):
    def __init__(self) -> None:
        self.pattern = re.compile(r'^搜索up\s*(\S+)$')
    def judgeTrigger(self, msg: str, data: Any) -> bool:
        return self.pattern.match(msg) != None
    def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
        return "OK"
    def getPluginInfo(self) -> dict:
        return {
            'name': 'BilibiliUpSearcher',
            'description': '搜索B站用户uid',
            'commandDescription': '搜索up <name>',
            'usePlace': ['group', ],
            'showInHelp': True,
            'pluginConfigTableNames': [],
            'version': '1.0.0',
            'author': 'Unicorn',
        }
class BilibiliSubscribe(StandardPlugin):
    def __init__(self) -> None:
        """
        self.bUps: uid -> User
        self.groupUps: group_id -> Set[uid: int]
        """
        self.pattern1 = re.compile(r'^订阅up\s*(\d+)$')
        self.pattern2 = re.compile(r'^取消订阅up\s*(\d+)$')
        self.pattern3 = re.compile(r'^已订阅up$')
        self.bUps:Dict[int, BilibiliMonitor] = {}
        self.groupUps:Dict[int, Set[int]] = {}
        self._loadFromSql()
    def _loadFromSql(self)->None:
        mydb = mysql.connector.connect(**sqlConfig)
        mycursor = mydb.cursor()
        mycursor.execute("""
        select group_id, uid from `BOT_DATA`.`bilibiliSubscribe`
        """)
        for group_id, uid in list(mycursor):
            if group_id not in self.groupUps.keys():
                self.groupUps[group_id] = set()
            if uid not in self.bUps.keys():
                self.bUps[uid] = BilibiliMonitor(uid)
            self.groupUps[group_id].add(uid)
            self.bUps[uid].addGroup(group_id)
    def judgeTrigger(self, msg: str, data: Any) -> bool:
        return self.pattern1.match(msg) != None or\
               self.pattern2.match(msg) != None or\
               self.pattern3.match(msg) != None

    def subscribeBilibili(self, group_id:int, bilibili_uid:int)->None:
        if group_id not in self.groupUps.keys():
            self.groupUps[group_id] = set()
        if bilibili_uid not in self.groupUps[group_id]:
            self.groupUps[group_id].add(bilibili_uid)
            mydb = mysql.connector.connect(**sqlConfig)
            mycursor = mydb.cursor()
            mydb.autocommit = True
            mycursor.execute("""
            insert ignore into BOT_DATA.bilibiliSubscribe set
            group_id = %d,
            uid = %d
            """%(group_id, bilibili_uid))
        if bilibili_uid not in self.bUps.keys():
            self.bUps[bilibili_uid] = BilibiliMonitor(bilibili_uid)
        self.bUps[bilibili_uid].addGroup(group_id)
    def unsubscribeBilibili(self, group_id:int, bilibili_uid:int)->None:
        if group_id in self.groupUps.keys() and bilibili_uid in self.groupUps[group_id]:
            self.groupUps[group_id].discard(bilibili_uid)
            mydb = mysql.connector.connect(**sqlConfig)
            mycursor = mydb.cursor()
            mydb.autocommit = True
            mycursor.execute("""
            delete from BOT_DATA.bilibiliSubscribe where
            group_id = %d and
            uid = %d
            """%(group_id, bilibili_uid))
        if bilibili_uid in self.bUps.keys():
            self.bUps[bilibili_uid].delGroup(group_id)

    def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
        group_id = data['group_id']
        if self.pattern1.match(msg) != None:
            uid = self.pattern1.findall(msg)[0]
            uid = int(uid)
            try:
                u = User(uid)
                userInfo = asyncio.run(u.get_user_info())
                self.subscribeBilibili(group_id, uid)
                name = gocqQuote(userInfo['name'])
                send(group_id, f'订阅成功!\nname: {name}\nuid: {uid}')
            except ResponseCodeException as e:
                send(group_id, f'好像没找到这个UP:\n{e}')
            except KeyError as e:
                warning('bilibili api get_user_info error: {}'.format(e))
        elif self.pattern2.match(msg) != None:
            uid = self.pattern2.findall(msg)[0]
            uid = int(uid)
            self.unsubscribeBilibili(group_id, uid)
            send(group_id, '[CQ:reply,id=%d]OK'%data['message_id'])
        elif self.pattern3.match(msg) != None:
            ups = self.subscribeList(group_id)
            if len(ups) == 0:
                send(group_id, '[CQ:reply,id=%d]本群还没有订阅up哦~'%data['message_id'])
            else:
                try:
                    metas = asyncio.run(asyncio.wait([up.get_user_info() for up in ups]))
                    metas = [m.result() for m in metas[0]]
                    metas = [f"name: {m['name']}\nuid: {m['mid']}" for m in metas]
                    send(group_id,f'本群订阅的up有:\n\n'+'\n----------\n'.join(metas))
                except BaseException as e:
                    send(group_id, 'bilibili api error')
                    warning('bilibili get_user_info error: {}'.format(e))
        return "OK"
    def onStateChange(self, nextState: bool, data: Any) -> None:
        group_id = data['group_id']
        if nextState or group_id not in self.groupUps.keys(): return
        for uid in copy.deepcopy(self.groupUps[group_id]):
            self.unsubscribeBilibili(group_id, uid)

    def subscribeList(self, group_id:int)->List[User]:
        uids = self.groupUps.get(group_id, set())
        return [self.bUps[uid].bUser for uid in uids]

    def getPluginInfo(self) -> dict:
        return {
            'name': 'BilibiliSubscribe',
            'description': '订阅B站up',
            'commandDescription': '订阅up <uid>/取消订阅up <uid>/已订阅up',
            'usePlace': ['group', ],
            'showInHelp': True,
            'pluginConfigTableNames': [],
            'version': '1.0.0',
            'author': 'Unicorn',
        }
class BilibiliMonitor(CronStandardPlugin):
    """bilibili up主监控类
    self.uid: 被监控的up主uid
    self.groupList
    """
    def __init__(self, uid:int) -> None:
        self.uid:int = uid
        self.bUser = User(uid=uid)
        self.groupList = set()
        self.job: Optional[Job] = None

        self.cumulativeNetworkErrCount = 0
        self._prevMeta:Optional[Tuple[int, str]] = None
    def addGroup(self, group_id:int):
        self.groupList.add(group_id)
        if self.job == None:
            self.job = self.start(0, 5 * 60)
        else:
            self.resume()
    def delGroup(self, group_id:int):
        self.groupList.discard(group_id)
        if len(self.groupList) == 0:
            self.pause()

    def tick(self) -> None:
        videos = None
        attempts = 0
        while videos == None and attempts < 3:
            attempts += 1
            try:
                videos = asyncio.run(self.bUser.get_videos())
            except BaseException as e:
                videos = None
        if videos == None: 
            self.cumulativeNetworkErrCount += 1
            if self.cumulativeNetworkErrCount >= 3:
                warning('bilibili subscribe api failed!')
                self.cumulativeNetworkErrCount = 0
                self.cancel()
            return
        else:
            self.cumulativeNetworkErrCount = 0

        try:
            videos = videos['list']['vlist']
            if len(videos) == 0: return
            latestVideo = max(videos, key=lambda x:x['created'])
            uploadTime = latestVideo['created']
            bvid = latestVideo['bvid']
            prevMeta = self.getPrevMeta()
            if prevMeta == None or prevMeta != (uploadTime, bvid):
                self.writeMeta(uploadTime, bvid)
                title = gocqQuote(latestVideo['title'])
                author = gocqQuote(latestVideo['author'])
                for group in self.groupList:
                    send(group, f'本群订阅UP主 【{author}】 更新视频啦!\n视频标题: {title}\n链接:{bvToUrl(bvid)}')
        except KeyError as e:
            warning('bilibili api has changed!')
            self.cancel()
            return
        except BaseException as e:
            warning('base excption in BilibiliMonitor: {}'.format(e))
            self.cancel()
    def getPrevMeta(self)->Optional[Tuple[int, str]]:
        """获取该up主记录在册的前一次上传数据
        @return: Optional[(
            [0]: int: uploadTime unix时间戳
            [1]: str: bvid
        )]
        """
        if self._prevMeta == None:
            mydb = mysql.connector.connect(**sqlConfig)
            mycursor = mydb.cursor()
            mycursor.execute("""
            select unix_timestamp(uploadTime), bvid from BOT_DATA.bilibiliUp where
            uid = %d
            """%self.uid)
            meta = list(mycursor)
            if len(meta) != 0:
                self._prevMeta = meta[0]
        return self._prevMeta

    def writeMeta(self, uploadTime:int, bvid:str)->None:
        """写入up主本次上传数据"""
        meta = (uploadTime, bvid)
        if self._prevMeta == meta: return

        self._prevMeta = meta
        mydb = mysql.connector.connect(**sqlConfig)
        mycursor = mydb.cursor()
        mydb.autocommit = True
        mycursor.execute("""
        insert into BOT_DATA.bilibiliUp set
        uploadTime = from_unixtime(%s),
        bvid = %s,
        uid = %s
        on duplicate key update
        uploadTime = from_unixtime(%s),
        bvid = %s
        """, (uploadTime, bvid, self.uid, uploadTime, bvid))
    def cancel(self,) -> None:
        if self.job != None: 
            self.job.remove()
    def pause(self) -> None:
        if self.job != None:
            self.job.pause()
    def resume(self) -> None:
        if self.job != None:
            self.job.resume()
    def __del__(self,):
        self.cancel()