import asyncio
import aiotieba
from aioflask import render_template, request, escape
from urllib.parse import quote_plus
from datetime import datetime
from aiotieba.api.get_posts._classdef import *
from aiotieba.api._classdef.contents import *
from shared import *
from extra import *
######################################################################
# Clean a leading part and append the text.
def append_with_leading_clean(orig, content):
if orig.endswith('
'):
return orig[:-4] + content
else:
return orig + content
# Return the corresponding user name for an id.
async def cache_name_from_id(c, i):
if not cache.get(i):
r = await c.get_user_info(i, require=aiotieba.enums.ReqUInfo.USER_NAME)
cache.set(i, r)
######################################################################
# Convert a timestamp to its simpliest readable date format.
@app.template_filter('simpledate')
def _jinja2_filter_simpledate(ts):
t = datetime.fromtimestamp(ts)
now = datetime.now()
if t.date() == now.date():
return t.strftime('%H:%m')
elif t.year == now.year:
return t.strftime('%m-%d')
else:
return t.strftime('%Y-%m-%d')
# Convert a timestamp to a humand readable date format.
@app.template_filter('date')
def _jinja2_filter_datetime(ts, fmt='%Y年%m月%d日 %H点%m分'):
return datetime.fromtimestamp(ts).strftime(fmt)
# Convert a integer to the one with separator like 1,000,000.
@app.template_filter('intsep')
def _jinja2_filter_intsep(i):
return f'{int(i):,}'
# Reduce the text to a shorter form.
@app.template_filter('trim')
def _jinja2_filter_trim(text):
return text[:78] + '……' if len(text) > 78 else text
# Format fragments to its equiviant HTML.
@app.template_filter('translate')
async def _jinja2_filter_translate(frags, reply_id=0):
htmlfmt = ''
if reply_id:
htmlfmt += f'@{ cache.get(reply_id) } '
for i in range(len(frags)):
frag = frags[i]
if isinstance(frag, FragText):
subfrags = frag.text.split('\n')
for subfrag in subfrags:
htmlfmt += str(escape(subfrag)) + '
'
elif isinstance(frag, FragImage_p):
htmlfmt += \
f'' \
f''
elif isinstance(frag, FragEmoji_p):
htmlfmt = append_with_leading_clean(htmlfmt,
f'')
if i+1 < len(frags) and isinstance(frags[i+1], FragImage_p):
htmlfmt += '
'
elif isinstance(frag, FragLink):
markup = '{ frag.title }'
htmlfmt = append_with_leading_clean(htmlfmt, markup)
elif isinstance(frag, FragAt):
htmlfmt = append_with_leading_clean(htmlfmt,
f'{ frag.text }')
else:
print('Unhandled: ', type(frag))
print(frag)
return htmlfmt
######################################################################
@app.route('/p/')
async def thread_view(tid):
tid = int(tid)
pn = int(request.args.get('pn') or 1)
ao = int(request.args.get('ao') or 0)
async with aiotieba.Client() as tieba:
# Default to 15 posts per page, confirm to tieba.baidu.com
thread_info = await tieba.get_posts(tid, rn=15, pn=pn,
with_comments=should_fetch_comments,
only_thread_author=ao)
available_users = []
for floor in thread_info:
for comment in floor.comments:
available_users.append(comment.author_id)
cache.set(comment.author_id, comment.user.user_name)
all_users = {}
for floor in thread_info:
for comment in floor.comments:
if comment.reply_to_id and not comment.reply_to_id in available_users:
all_users[comment.reply_to_id] = ''
all_users = list(all_users.keys())
await asyncio.gather(*(cache_name_from_id(tieba, i) for i in all_users))
return await render_template('thread.html', info=thread_info, ao=ao)
@app.route('/f')
async def forum_view():
fname = request.args['kw'][:-1] if request.args['kw'][-1] == '吧' else request.args['kw']
pn = int(request.args.get('pn') or 1)
sort = int(request.args.get('sort') or 0)
async with aiotieba.Client() as tieba:
if only_use_native_api:
forum_info, threads = await asyncio.gather(tieba.get_forum_detail(fname),
tieba.get_threads(fname, pn=pn, sort=sort))
if hasattr(forum_info, 'slogan'):
forum_info = { 'avatar': extract_image_name(forum_info.origin_avatar),
'topic': forum_info.post_num, 'thread': forum_info.post_num,
'member': forum_info.member_num, 'desc': forum_info.slogan,
'name': forum_info.fname }
else:
forum_info = { 'avatar': 'a6efce1b9d16fdfa6291460ab98f8c5495ee7b51.jpg',
'topic': forum_info.post_num, 'thread': forum_info.post_num,
'member': forum_info.member_num, 'desc': '贴吧描述暂不可用', 'name': forum_info.fname }
else:
forum_info, threads = await asyncio.gather(awaitify(find_tieba_info)(fname),
tieba.get_threads(fname, pn=pn, sort=sort))
if threads.page.current_page > threads.page.total_page or pn < 1:
return await render_template('error.html', msg = \
f'请求越界,本贴吧共有 { threads.page.total_page } 页'
f'而您查询了第 { threads.page.current_page} 页')
return await render_template('bar.html', info=forum_info, threads=threads, sort=sort,
tp = ((115 if threads.page.total_page > 115 else threads.page.total_page) if sort == 0 else threads.page.total_page))
@app.route('/home/main')
async def user_view():
pn = int(request.args.get('pn') or 1)
i = request.args.get('id')
try: # try converting it to user_id, otherwise using the string.
i = int(i)
except:
pass
async with aiotieba.Client() as tieba:
try:
hp = await tieba.get_homepage(i, pn)
except ValueError:
return await render_template('error.html', msg='您已超过最后页')
if len(hp[1]) == 0 and pn > 1:
return await render_template('error.html', msg='您已超过最后页')
return await render_template('user.html', hp=hp, pn=pn)
@app.route('/')
async def main_view():
return await render_template('index.html')
######################################################################
@app.errorhandler(RuntimeError)
async def runtime_error_view(e):
if hasattr(e, 'msg'):
return await render_template('error.html', msg=e.msg)
return await render_template('error.html', msg='错误信息不可用')
@app.errorhandler(Exception)
async def general_error_view(e):
return await render_template('error.html', msg=e)
if __name__ == '__main__':
app.run(debug=True)