add timeout to the proxy to avoid blocking

This commit is contained in:
John Xina 2023-07-21 11:02:50 +08:00
parent ea7409312a
commit cb187f3b56

View File

@ -1,55 +1,88 @@
# Asgiproxify - An ASGI middleware for dynamic reverse proxy
# Copyright (C) 2023 William Goodspeed (龚志乐)
#
# This library is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this library. If not, see <https://www.gnu.org/licenses/>.
"""An ASGI middleware for dynamic reverse proxy."""
import asyncio import asyncio
import aiohttp import aiohttp
class AsgiproxifyHandler(): class AsgiproxifyHandler():
"""Handle the proxy request from the AsgiProxify middleware."""
def __init__(self, scope): def __init__(self, scope):
self.scope = scope self.scope = scope
def make_request_url(self): def make_request_url(self):
"""Specify which URL will be requested from aiohttp client."""
return 'http://example.org/' return 'http://example.org/'
def make_request_cookies(self): def make_request_cookies(self):
"""Specify which cookies will be used from aiohttp client."""
return {} return {}
def make_request_headers(self): def make_request_headers(self):
req_headers = {k.decode(): v.decode() for k, v in self.scope['headers']} """Specify which headers will be used from aiohttp client."""
req_headers = {k.decode(): v.decode()
for k, v in self.scope['headers']}
req_headers.pop('host', None) req_headers.pop('host', None)
return req_headers return req_headers
def make_response_headers(self, upstream_headers): def make_response_headers(self, upstream_headers):
"""Specify which headers will be returned to the actual client."""
headers = dict(upstream_headers) headers = dict(upstream_headers)
headers.pop('Server', None) headers.pop('Server', None)
headers.pop('Date', None) headers.pop('Date', None)
resp_headers = [(k, v) for k, v in headers.items()] resp_headers = list(headers.items())
return resp_headers return resp_headers
def make_request(self, session): def make_request(self, session):
"""Generate a aiohttp request for streaming contents."""
return session.request('GET', self.make_request_url(), return session.request('GET', self.make_request_url(),
cookies=self.make_request_cookies(), cookies=self.make_request_cookies(),
headers=self.make_request_headers(),) headers=self.make_request_headers(),)
class Asgiproxify(): class Asgiproxify():
"""An ASGI middleware for dynamic reverse proxy."""
app = None app = None
reg = {} reg = {}
def __init__(self, app=None): def __init__(self, app=None):
"""Initialize an Asgiproxify app with optional fallback app."""
self.to(app) self.to(app)
def to(self, app): def to(self, app):
"""Set the ASGI app which will be forwarded to if no proxy."""
self.app = app self.app = app
def install(self, leading_path, handler): def install(self, leading_path, handler):
"""Install a proxy handler for handling `leading_path'."""
self.reg[leading_path] = handler self.reg[leading_path] = handler
def register(self, leading_path): def register(self, leading_path):
"""Register the current class as a porxy handler for `leading_path'."""
def decorator(c): def decorator(c):
self.install(leading_path, c) self.install(leading_path, c)
return decorator return decorator
async def handle_proxy(self, scope, receive, send, handler): async def _handle_proxy(self, scope, receive, send, handler):
handler_i = handler(scope) handler_i = handler(scope)
request = await receive() await receive()
async def reverse_proxy_task(): async def reverse_proxy_task():
async with aiohttp.ClientSession(auto_decompress=False) as session: async with aiohttp.ClientSession(auto_decompress=False) as session:
@ -57,18 +90,23 @@ class Asgiproxify():
await send({ await send({
'type': 'http.response.start', 'type': 'http.response.start',
'status': resp.status, 'status': resp.status,
'headers': handler_i.make_response_headers(resp.headers), 'headers': handler_i
.make_response_headers(resp.headers),
}) })
async for chunk, end_of_resp in resp.content.iter_chunks(): async for chunk, _ in resp.content.iter_chunks():
try:
async with asyncio.timeout(10):
await send({ await send({
'type': 'http.response.body', 'type': 'http.response.body',
'body': chunk, 'body': chunk,
'more_body': True, 'more_body': True,
}) })
except TimeoutError:
return
await send({'type': 'http.response.body'}) await send({'type': 'http.response.body'})
task = asyncio.create_task(reverse_proxy_task()) task = asyncio.create_task(reverse_proxy_task())
while True:
ev = await receive() ev = await receive()
if ev['type'] == 'http.disconnect': if ev['type'] == 'http.disconnect':
task.cancel() task.cancel()
@ -86,5 +124,4 @@ class Asgiproxify():
if not handler: if not handler:
return await self.app(scope, receive, send) return await self.app(scope, receive, send)
else: return await self._handle_proxy(scope, receive, send, handler)
return await self.handle_proxy(scope, receive, send, handler)