前言
tornado在我工作前说实话我还没有听说过,今年回来后接触的非常多。
关于想要学习异步的内容,起因是如下场景:
tornado拿到请求后,对他的处理时间非常的长,一般大概有10s,晨旭让我把他写成异步的,如果只是对请求异步的是相当好写的。就是十分传统地在收到请求后立即返回,然后进行处理,处理了之后再返回给给定的callback_url
。
但我突然想到,能否对这整个处理进行异步,将其放到后台运行,然后继续接收请求不至于在请求上堵塞。
最后是没实现出来……坤爷让我去花时间了解一下tornado所著名的异步。于是我才发现,我这想法在tornado的异步中是不可行的。(而且错误的地方还蛮多的……
异步使用方式
from tornado import gen
@gen.coroutine
def fetch_coroutine(url):
http_client = AsyncHTTPClient()
response = yield http_client.fetch(url)
raise gen.Return(response.body)
这是最常用也是tornado推荐的方式。不过这样写的前提是py2,py3有些新特性,写的方式就不太一样了。
正如代码中展示的,你需要给这个函数加个gen.coroutine
装饰器,然后在你想要异步处理的函数加个yield
。
说实话,这已经有点跟我想的不太一样了,callback_function
呢?没了?emmmmm
好吧,这样写的确是没有的。
关于协程
简单介绍以下协程,协程是线程通过自己来调度任务,保存恢复上下文,而这样做的好处就是能减少切换线程或者进程的损耗,如果线程数量极多,但是做的事情不多,简单说运行类型是io密集型的话,可以考虑使用协程。
因为如果是cpu密集型的话,毕竟本质上还是一个线程,所以会堵塞到其他的协程,这与我们的高效目的是相违背的。
另外正式开始讲之前,我们首先需要明确的是一个函数如果带有yeild
,那么这个函数实质上就是一个生成器。我们直接对其调用它返回的也是一个生成器。所有会有这样一句话:
所有生成器都是异步的
实际上也确实如此,生成器立即返回,我们想要执行器内容的话,可以自行调用next
或者send
(非第一次)
于是python的yield对于协程有了天然的支持
源码解析
注:本文章分析的源码是 2018.12.24 的 stable 版本
coroutine
协程,也是 tornado.gen
中的一个装饰器
这个装饰器其内容只有一个 return。而在 tornado.gen
中其实还有个装饰器engine
,它的replace_callback
的 默认值为False
return _make_coroutine_wrapper(func, replace_callback=True)
以下是_make_coroutine_wrapper
的源码,源码十分紧凑,考虑的很全面,先说下大多数情况
def _make_coroutine_wrapper(func, replace_callback):
wrapped = func
if hasattr(types, 'coroutine'):
func = types.coroutine(func)
@functools.wraps(wrapped)
def wrapper(*args, **kwargs):
future = _create_future()
if replace_callback and 'callback' in kwargs:
warnings.warn("callback arguments are deprecated, use the returned Future instead",
DeprecationWarning, stacklevel=2)
callback = kwargs.pop('callback')
IOLoop.current().add_future(
future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs)
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future_set_exc_info(future, sys.exc_info())
try:
return future
finally:
# Avoid circular references
future = None
else:
if isinstance(result, GeneratorType):
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result)
if stack_context._state.contexts is not orig_stack_contexts:
yielded = _create_future()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
except Exception:
future_set_exc_info(future, sys.exc_info())
else:
runner = Runner(result, future, yielded)
# 这一行不太理解,注释是说让它保持存在
future.add_done_callback(lambda _: runner)
yielded = None
try:
return future
finally:
future = None
future_set_result_unless_cancelled(future, result)
return future
wrapper.__wrapped__ = wrapped
wrapper.__tornado_coroutine__ = True
return wrapper
coroutine 装饰的函数,首先会生成一个 generator, 并对其第一次 next 调用。
这里我们可以注意到,这个yield
后的函数无论如何它都是会被立刻调用的。
所以 yield 后的函数必须也是异步或者耗时不长的,才能达到预期的异步效果,否则该阻塞的还是会阻塞。
在 next
调用后,有个异常捕获,在代码 36L,这个异常捕获犹为重要,因为我们知道 yield
等式,比如var = yield func()
,var
不会被赋值成func
的返回。tornado
提供了一个异常类Return
作为返回,通过在调用出捕获Return
异常,取出其返回值得以实现。
在第一次的next
后,如果没有其他异常,就会创建一个Runner
类,这个Runner
类的作用就是,把其他的代码通过yield
不断暂停恢复,放在ioloop里运行。
Future
Future
可以说是个中介,它是用来沟通coroutine
和ioloop
的,coroutine
返回的都是Future
但其实最重要的还是管理协程的暂停与恢复,一个ioloop中保存着多个后端运行类Runner
类的runner
方法,在ioloop中不断暂停恢复,而每一个runner
又都会绑定一个future
,只有future
被set_done
了,才表示上一阶段已经完成并暂停了,可以继续恢复运行。
class Future(object):
def done(self):# 协程执行完毕并暂停,可对其恢复
return self._done
def result(self, timeout=None):
self._check_done() # 如果没有 done 抛出异常
return self._result
def add_done_callback(self, fn): # 添加回调函数
if self._done:
from tornado.ioloop import IOLoop
IOLoop.current().add_callback(fn, self)
else:
self._callbacks.append(fn)
def set_result(self, result): # 设置result & done
self._result = result
self._set_done()
def _set_done(self): # 将所有回调函数放到 ioloop中
self._done = True
if self._callbacks:
from tornado.ioloop import IOLoop
loop = IOLoop.current()
for cb in self._callbacks:
loop.add_callback(cb, self)
self._callbacks = None
IOLoop
IOLoop是在整个tornado的主事件循环。按我理解主要做了两件事
- 执行异步的callback
- io复用
并且这两件事它是写死的,它是顺序执行的,这就直接反驳了我最开始的想法:让两者并行执行
io复用根据系统会自动调整的,具体的我也不再细说。
以下是精简之后的源码
class PollIOLoop(IOLoop):
def add_future(self, future, callback):
assert is_future(future)
callback = stack_context.wrap(callback)
future_add_done_callback(
future, lambda future: self.add_callback(callback, future))
def add_callback(self, callback, *args, **kwargs):
if self._closing:
return
self._callbacks.append(functools.partial(
stack_context.wrap(callback), *args, **kwargs)) # 将其以偏函数形式保存起来
if thread.get_ident() != self._thread_ident:
self._waker.wake()
else:
pass
def start(self):
# 这里有一堆初始化操作
while True:
ncallbacks = len(self._callbacks)
due_timeouts = []
if self._timeouts:
now = self.time()
while self._timeouts:
if self._timeouts[0].callback is None:
heapq.heappop(self._timeouts)
self._cancellations -= 1
elif self._timeouts[0].deadline <= now:
due_timeouts.append(heapq.heappop(self._timeouts))
else:
break
if (self._cancellations > 512 and
self._cancellations > (len(self._timeouts) >> 1)):
self._cancellations = 0
self._timeouts = [x for x in self._timeouts
if x.callback is not None]
heapq.heapify(self._timeouts)
for i in range(ncallbacks):
self._run_callback(self._callbacks.popleft())
for timeout in due_timeouts:
if timeout.callback is not None:
self._run_callback(timeout.callback)
if not self._running:
break
# 这里有设置poll_timeout
event_pairs = self._impl.poll(poll_timeout)
self._events.update(event_pairs)
while self._events:
fd, events = self._events.popitem()
try:
fd_obj, handler_func = self._handlers[fd]
handler_func(fd_obj, events)
except (OSError, IOError) as e:
if errno_from_exception(e) == errno.EPIPE:
pass
else:
self.handle_callback_exception(self._handlers.get(fd))
except Exception:
self.handle_callback_exception(self._handlers.get(fd))
fd_obj = handler_func = None
# 这里有一些后置处理
Runner
Runner.run
是后置处理的主函数,我在之前也有提到过,它通过获取future
的result
,再将其send
以恢复协程继续执行。
如果不能捕获到任何异常,就说明有新的coroutine
,新的coroutine
都是通过handle_yield
将其放进ioloop
class Runner(object):
def __init__(self, gen, result_future, first_yielded):
self.gen = gen
self.result_future = result_future
self.future = _null_future
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
# 上面一堆不需要看的初始化
if self.handle_yield(first_yielded):
gen = result_future = first_yielded = None
self.run()
def handle_yield(self, yielded):
self.future = convert_yielded(yielded)
if self.future is moment:
self.io_loop.add_callback(self.run)
return False
elif not self.future.done():
def inner(f):
# Break a reference cycle to speed GC.
f = None
self.run()
self.io_loop.add_future(
self.future, inner)
return False
return True
def run(self):
if self.running or self.finished:
return
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None
try:
orig_stack_contexts = stack_context._state.contexts
exc_info = None
try:
value = future.result()
except Exception:
self.had_exception = True
exc_info = sys.exc_info()
future = None
yielded = self.gen.send(value)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
if self.pending_callbacks and not self.had_exception:
raise LeakedCallbackError(
"finished without waiting for callbacks %r" %
self.pending_callbacks)
future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
except Exception:
# 一些结束操作
return
if not self.handle_yield(yielded):
return
yielded = None
finally:
self.running = False