B站订阅
1. 插件简介
插件名称 | 父类 | 触发关键词 | 触发权限 | 内容 |
---|---|---|---|---|
BilibiliSubscribeHelper | StandardPlugin | 'B站订阅帮助' | None | 获取B站订阅帮助 |
BilibiliSubscribe | StandardPlugin | '订阅up' / '取消订阅' / '已订阅up' | None | 订阅B站up主等 |
2. 示范样例
111> B站订阅帮助
bot> 【订阅帮助内容】
111> 已订阅up
bot> 【回复上文】本群还没有订阅up哦~
222> 订阅up 316568752
bot> 订阅成功!
name: 马督工
uid: 316568752
111> 已订阅up
bot> 本群订阅的up有:
name: 马督工
uid: 316568752
<马逆更新视频后几分钟>
bot> 本群订阅UP主 【马督工】 更新视频啦!
视频标题:[...]
链接:[...]
3. 代码分析
代码位于 plugins/bilibiliSubscribe.py
def bvToUrl(bvid:str):
return 'https://www.bilibili.com/video/' + bvid
def createBilibiliTable()->None:
mydb = mysql.connector.connect(**sqlConfig)
mycursor = mydb.cursor()
mydb.autocommit = True
mycursor.execute("""
create table if not exists `BOT_DATA`.`bilibiliUp` (
`uid` bigint unsigned not null,
`uploadTime` timestamp not null,
`bvid` char(20) not null,
primary key (`uid`)
)""")
mycursor.execute("""
create table if not exists `BOT_DATA`.`bilibiliSubscribe` (
`group_id` bigint unsigned not null,
`uid` bigint unsigned not null,
primary key(`group_id`, `uid`)
)""")
class BilibiliSubscribeHelper(StandardPlugin):
def judgeTrigger(self, msg: str, data: Any) -> bool:
return msg == "B站订阅帮助" and data['message_type']=='group'
def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
group_id = data['group_id']
send(group_id,'订阅帮助: B站订阅帮助\n'
'订阅up: 订阅up <uid>\n'
'取消订阅up: 取消订阅up<uid>\n'
'获取已订阅: 已订阅up\n'
'注意: 关闭本插件会自动取消订阅所有已订阅的up')
return "OK"
def getPluginInfo(self) -> dict:
return {
'name': 'BilibiliSubscribeHelper',
'description': 'B站订阅帮助',
'commandDescription': 'B站订阅帮助',
'usePlace': ['group', ],
'showInHelp': True,
'pluginConfigTableNames': [],
'version': '1.0.0',
'author': 'Unicorn',
}
class BilibiliUpSearcher(StandardPlugin):
def __init__(self) -> None:
self.pattern = re.compile(r'^搜索up\s*(\S+)$')
def judgeTrigger(self, msg: str, data: Any) -> bool:
return self.pattern.match(msg) != None
def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
return "OK"
def getPluginInfo(self) -> dict:
return {
'name': 'BilibiliUpSearcher',
'description': '搜索B站用户uid',
'commandDescription': '搜索up <name>',
'usePlace': ['group', ],
'showInHelp': True,
'pluginConfigTableNames': [],
'version': '1.0.0',
'author': 'Unicorn',
}
class BilibiliSubscribe(StandardPlugin):
def __init__(self) -> None:
"""
self.bUps: uid -> User
self.groupUps: group_id -> Set[uid: int]
"""
self.pattern1 = re.compile(r'^订阅up\s*(\d+)$')
self.pattern2 = re.compile(r'^取消订阅up\s*(\d+)$')
self.pattern3 = re.compile(r'^已订阅up$')
self.bUps:Dict[int, BilibiliMonitor] = {}
self.groupUps:Dict[int, Set[int]] = {}
self._loadFromSql()
def _loadFromSql(self)->None:
mydb = mysql.connector.connect(**sqlConfig)
mycursor = mydb.cursor()
mycursor.execute("""
select group_id, uid from `BOT_DATA`.`bilibiliSubscribe`
""")
for group_id, uid in list(mycursor):
if group_id not in self.groupUps.keys():
self.groupUps[group_id] = set()
if uid not in self.bUps.keys():
self.bUps[uid] = BilibiliMonitor(uid)
self.groupUps[group_id].add(uid)
self.bUps[uid].addGroup(group_id)
def judgeTrigger(self, msg: str, data: Any) -> bool:
return self.pattern1.match(msg) != None or\
self.pattern2.match(msg) != None or\
self.pattern3.match(msg) != None
def subscribeBilibili(self, group_id:int, bilibili_uid:int)->None:
if group_id not in self.groupUps.keys():
self.groupUps[group_id] = set()
if bilibili_uid not in self.groupUps[group_id]:
self.groupUps[group_id].add(bilibili_uid)
mydb = mysql.connector.connect(**sqlConfig)
mycursor = mydb.cursor()
mydb.autocommit = True
mycursor.execute("""
insert ignore into BOT_DATA.bilibiliSubscribe set
group_id = %d,
uid = %d
"""%(group_id, bilibili_uid))
if bilibili_uid not in self.bUps.keys():
self.bUps[bilibili_uid] = BilibiliMonitor(bilibili_uid)
self.bUps[bilibili_uid].addGroup(group_id)
def unsubscribeBilibili(self, group_id:int, bilibili_uid:int)->None:
if group_id in self.groupUps.keys() and bilibili_uid in self.groupUps[group_id]:
self.groupUps[group_id].discard(bilibili_uid)
mydb = mysql.connector.connect(**sqlConfig)
mycursor = mydb.cursor()
mydb.autocommit = True
mycursor.execute("""
delete from BOT_DATA.bilibiliSubscribe where
group_id = %d and
uid = %d
"""%(group_id, bilibili_uid))
if bilibili_uid in self.bUps.keys():
self.bUps[bilibili_uid].delGroup(group_id)
def executeEvent(self, msg: str, data: Any) -> Union[None, str]:
group_id = data['group_id']
if self.pattern1.match(msg) != None:
uid = self.pattern1.findall(msg)[0]
uid = int(uid)
try:
u = User(uid)
userInfo = asyncio.run(u.get_user_info())
self.subscribeBilibili(group_id, uid)
name = gocqQuote(userInfo['name'])
send(group_id, f'订阅成功!\nname: {name}\nuid: {uid}')
except ResponseCodeException as e:
send(group_id, f'好像没找到这个UP:\n{e}')
except KeyError as e:
warning('bilibili api get_user_info error: {}'.format(e))
elif self.pattern2.match(msg) != None:
uid = self.pattern2.findall(msg)[0]
uid = int(uid)
self.unsubscribeBilibili(group_id, uid)
send(group_id, '[CQ:reply,id=%d]OK'%data['message_id'])
elif self.pattern3.match(msg) != None:
ups = self.subscribeList(group_id)
if len(ups) == 0:
send(group_id, '[CQ:reply,id=%d]本群还没有订阅up哦~'%data['message_id'])
else:
try:
metas = asyncio.run(asyncio.wait([up.get_user_info() for up in ups]))
metas = [m.result() for m in metas[0]]
metas = [f"name: {m['name']}\nuid: {m['mid']}" for m in metas]
send(group_id,f'本群订阅的up有:\n\n'+'\n----------\n'.join(metas))
except BaseException as e:
send(group_id, 'bilibili api error')
warning('bilibili get_user_info error: {}'.format(e))
return "OK"
def onStateChange(self, nextState: bool, data: Any) -> None:
group_id = data['group_id']
if nextState or group_id not in self.groupUps.keys(): return
for uid in copy.deepcopy(self.groupUps[group_id]):
self.unsubscribeBilibili(group_id, uid)
def subscribeList(self, group_id:int)->List[User]:
uids = self.groupUps.get(group_id, set())
return [self.bUps[uid].bUser for uid in uids]
def getPluginInfo(self) -> dict:
return {
'name': 'BilibiliSubscribe',
'description': '订阅B站up',
'commandDescription': '订阅up <uid>/取消订阅up <uid>/已订阅up',
'usePlace': ['group', ],
'showInHelp': True,
'pluginConfigTableNames': [],
'version': '1.0.0',
'author': 'Unicorn',
}
class BilibiliMonitor(CronStandardPlugin):
"""bilibili up主监控类
self.uid: 被监控的up主uid
self.groupList
"""
def __init__(self, uid:int) -> None:
self.uid:int = uid
self.bUser = User(uid=uid)
self.groupList = set()
self.job: Optional[Job] = None
self.cumulativeNetworkErrCount = 0
self._prevMeta:Optional[Tuple[int, str]] = None
def addGroup(self, group_id:int):
self.groupList.add(group_id)
if self.job == None:
self.job = self.start(0, 5 * 60)
else:
self.resume()
def delGroup(self, group_id:int):
self.groupList.discard(group_id)
if len(self.groupList) == 0:
self.pause()
def tick(self) -> None:
videos = None
attempts = 0
while videos == None and attempts < 3:
attempts += 1
try:
videos = asyncio.run(self.bUser.get_videos())
except BaseException as e:
videos = None
if videos == None:
self.cumulativeNetworkErrCount += 1
if self.cumulativeNetworkErrCount >= 3:
warning('bilibili subscribe api failed!')
self.cumulativeNetworkErrCount = 0
self.cancel()
return
else:
self.cumulativeNetworkErrCount = 0
try:
videos = videos['list']['vlist']
if len(videos) == 0: return
latestVideo = max(videos, key=lambda x:x['created'])
uploadTime = latestVideo['created']
bvid = latestVideo['bvid']
prevMeta = self.getPrevMeta()
if prevMeta == None or prevMeta != (uploadTime, bvid):
self.writeMeta(uploadTime, bvid)
title = gocqQuote(latestVideo['title'])
author = gocqQuote(latestVideo['author'])
for group in self.groupList:
send(group, f'本群订阅UP主 【{author}】 更新视频啦!\n视频标题: {title}\n链接:{bvToUrl(bvid)}')
except KeyError as e:
warning('bilibili api has changed!')
self.cancel()
return
except BaseException as e:
warning('base excption in BilibiliMonitor: {}'.format(e))
self.cancel()
def getPrevMeta(self)->Optional[Tuple[int, str]]:
"""获取该up主记录在册的前一次上传数据
@return: Optional[(
[0]: int: uploadTime unix时间戳
[1]: str: bvid
)]
"""
if self._prevMeta == None:
mydb = mysql.connector.connect(**sqlConfig)
mycursor = mydb.cursor()
mycursor.execute("""
select unix_timestamp(uploadTime), bvid from BOT_DATA.bilibiliUp where
uid = %d
"""%self.uid)
meta = list(mycursor)
if len(meta) != 0:
self._prevMeta = meta[0]
return self._prevMeta
def writeMeta(self, uploadTime:int, bvid:str)->None:
"""写入up主本次上传数据"""
meta = (uploadTime, bvid)
if self._prevMeta == meta: return
self._prevMeta = meta
mydb = mysql.connector.connect(**sqlConfig)
mycursor = mydb.cursor()
mydb.autocommit = True
mycursor.execute("""
insert into BOT_DATA.bilibiliUp set
uploadTime = from_unixtime(%s),
bvid = %s,
uid = %s
on duplicate key update
uploadTime = from_unixtime(%s),
bvid = %s
""", (uploadTime, bvid, self.uid, uploadTime, bvid))
def cancel(self,) -> None:
if self.job != None:
self.job.remove()
def pause(self) -> None:
if self.job != None:
self.job.pause()
def resume(self) -> None:
if self.job != None:
self.job.resume()
def __del__(self,):
self.cancel()