Tornado 异步源码解析

前言

tornado在我工作前说实话我还没有听说过,今年回来后接触的非常多。

关于想要学习异步的内容,起因是如下场景:
tornado拿到请求后,对他的处理时间非常的长,一般大概有10s,晨旭让我把他写成异步的,如果只是对请求异步的是相当好写的。就是十分传统地在收到请求后立即返回,然后进行处理,处理了之后再返回给给定的callback_url
但我突然想到,能否对这整个处理进行异步,将其放到后台运行,然后继续接收请求不至于在请求上堵塞。

最后是没实现出来……坤爷让我去花时间了解一下tornado所著名的异步。于是我才发现,我这想法在tornado的异步中是不可行的。(而且错误的地方还蛮多的……

异步使用方式

from tornado import gen

@gen.coroutine
def fetch_coroutine(url):
    http_client = AsyncHTTPClient()
    response = yield http_client.fetch(url)
    raise gen.Return(response.body)

这是最常用也是tornado推荐的方式。不过这样写的前提是py2,py3有些新特性,写的方式就不太一样了。
正如代码中展示的,你需要给这个函数加个gen.coroutine装饰器,然后在你想要异步处理的函数加个yield

说实话,这已经有点跟我想的不太一样了,callback_function呢?没了?emmmmm
好吧,这样写的确是没有的。

关于协程

简单介绍以下协程,协程是线程通过自己来调度任务,保存恢复上下文,而这样做的好处就是能减少切换线程或者进程的损耗,如果线程数量极多,但是做的事情不多,简单说运行类型是io密集型的话,可以考虑使用协程。
因为如果是cpu密集型的话,毕竟本质上还是一个线程,所以会堵塞到其他的协程,这与我们的高效目的是相违背的。

另外正式开始讲之前,我们首先需要明确的是一个函数如果带有yeild,那么这个函数实质上就是一个生成器。我们直接对其调用它返回的也是一个生成器。所有会有这样一句话:

所有生成器都是异步的

实际上也确实如此,生成器立即返回,我们想要执行器内容的话,可以自行调用next或者send(非第一次)

于是python的yield对于协程有了天然的支持

源码解析

注:本文章分析的源码是 2018.12.24 的 stable 版本

coroutine

协程,也是 tornado.gen 中的一个装饰器

这个装饰器其内容只有一个 return。而在 tornado.gen 中其实还有个装饰器engine,它的replace_callback 的 默认值为False

return _make_coroutine_wrapper(func, replace_callback=True)

以下是_make_coroutine_wrapper 的源码,源码十分紧凑,考虑的很全面,先说下大多数情况

def _make_coroutine_wrapper(func, replace_callback):
    wrapped = func
    if hasattr(types, 'coroutine'):
        func = types.coroutine(func)

    @functools.wraps(wrapped)
    def wrapper(*args, **kwargs):
        future = _create_future()

        if replace_callback and 'callback' in kwargs:
            warnings.warn("callback arguments are deprecated, use the returned Future instead",
                          DeprecationWarning, stacklevel=2)
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(
                future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future_set_exc_info(future, sys.exc_info())
            try:
                return future
            finally:
                # Avoid circular references
                future = None
        else:
            if isinstance(result, GeneratorType):
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = _create_future()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future_set_result_unless_cancelled(future, _value_from_stopiteration(e))
                except Exception:
                    future_set_exc_info(future, sys.exc_info())
                else:
                    runner = Runner(result, future, yielded)
                    # 这一行不太理解,注释是说让它保持存在
                    future.add_done_callback(lambda _: runner)
                yielded = None
                try:
                    return future
                finally:
                    future = None
        future_set_result_unless_cancelled(future, result)
        return future

    wrapper.__wrapped__ = wrapped
    wrapper.__tornado_coroutine__ = True
    return wrapper

coroutine 装饰的函数,首先会生成一个 generator, 并对其第一次 next 调用。
这里我们可以注意到,这个yield 后的函数无论如何它都是会被立刻调用的。
所以 yield 后的函数必须也是异步或者耗时不长的,才能达到预期的异步效果,否则该阻塞的还是会阻塞。

next调用后,有个异常捕获,在代码 36L,这个异常捕获犹为重要,因为我们知道 yield 等式,比如var = yield func()var不会被赋值成func的返回。tornado提供了一个异常类Return 作为返回,通过在调用出捕获Return异常,取出其返回值得以实现。

在第一次的next后,如果没有其他异常,就会创建一个Runner类,这个Runner类的作用就是,把其他的代码通过yield不断暂停恢复,放在ioloop里运行。

Future

Future可以说是个中介,它是用来沟通coroutineioloop的,coroutine返回的都是Future

但其实最重要的还是管理协程的暂停与恢复,一个ioloop中保存着多个后端运行类Runner类的runner方法,在ioloop中不断暂停恢复,而每一个runner又都会绑定一个future,只有futureset_done了,才表示上一阶段已经完成并暂停了,可以继续恢复运行。

class Future(object):
    def done(self):# 协程执行完毕并暂停,可对其恢复
        return self._done

    def result(self, timeout=None):
        self._check_done() # 如果没有 done 抛出异常 
        return self._result

    def add_done_callback(self, fn): # 添加回调函数
        if self._done:
            from tornado.ioloop import IOLoop
            IOLoop.current().add_callback(fn, self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result): # 设置result & done
        self._result = result
        self._set_done()
    
    def _set_done(self): # 将所有回调函数放到 ioloop中
        self._done = True
        if self._callbacks:
            from tornado.ioloop import IOLoop
            loop = IOLoop.current()
            for cb in self._callbacks:
                loop.add_callback(cb, self)
            self._callbacks = None

IOLoop

IOLoop是在整个tornado的主事件循环。按我理解主要做了两件事

  • 执行异步的callback
  • io复用

并且这两件事它是写死的,它是顺序执行的,这就直接反驳了我最开始的想法:让两者并行执行
io复用根据系统会自动调整的,具体的我也不再细说。

以下是精简之后的源码

class PollIOLoop(IOLoop):
    def add_future(self, future, callback):
        assert is_future(future)
        callback = stack_context.wrap(callback)
        future_add_done_callback(
            future, lambda future: self.add_callback(callback, future))

    def add_callback(self, callback, *args, **kwargs):
        if self._closing:
            return
        self._callbacks.append(functools.partial(
            stack_context.wrap(callback), *args, **kwargs)) # 将其以偏函数形式保存起来
        if thread.get_ident() != self._thread_ident:
            self._waker.wake()
        else:
            pass
    
    def start(self):
        # 这里有一堆初始化操作
        while True:
            ncallbacks = len(self._callbacks)
            due_timeouts = []
            if self._timeouts:
            	now = self.time()
                while self._timeouts:
                    if self._timeouts[0].callback is None:
                    	heapq.heappop(self._timeouts)
                        self._cancellations -= 1
                    elif self._timeouts[0].deadline <= now:
                        due_timeouts.append(heapq.heappop(self._timeouts))
                    else:
                        break
                if (self._cancellations > 512 and
                        self._cancellations > (len(self._timeouts) >> 1)):
                    self._cancellations = 0
                    self._timeouts = [x for x in self._timeouts
                                      if x.callback is not None]
                    heapq.heapify(self._timeouts)
            
            for i in range(ncallbacks):
            	self._run_callback(self._callbacks.popleft())
            for timeout in due_timeouts:
                if timeout.callback is not None:
                    self._run_callback(timeout.callback)
            if not self._running:
                break
            # 这里有设置poll_timeout
            event_pairs = self._impl.poll(poll_timeout)
            self._events.update(event_pairs)
            while self._events:
                fd, events = self._events.popitem()
                try:
                    fd_obj, handler_func = self._handlers[fd]
                    handler_func(fd_obj, events)
                except (OSError, IOError) as e:
                    if errno_from_exception(e) == errno.EPIPE:
                        pass
                    else:
                        self.handle_callback_exception(self._handlers.get(fd))
                except Exception:
                    self.handle_callback_exception(self._handlers.get(fd))
            fd_obj = handler_func = None
            # 这里有一些后置处理

Runner

Runner.run是后置处理的主函数,我在之前也有提到过,它通过获取futureresult,再将其send以恢复协程继续执行。

如果不能捕获到任何异常,就说明有新的coroutine,新的coroutine都是通过handle_yield将其放进ioloop

class Runner(object):
    def __init__(self, gen, result_future, first_yielded):
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future
        self.yield_point = None
        self.pending_callbacks = None
        self.results = None
        self.running = False
        self.finished = False
        self.had_exception = False
        self.io_loop = IOLoop.current()
        self.stack_context_deactivate = None
        # 上面一堆不需要看的初始化
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None
            self.run()
     
    
	def handle_yield(self, yielded):

        self.future = convert_yielded(yielded)

        if self.future is moment:
            self.io_loop.add_callback(self.run)
            return False
        elif not self.future.done():
            def inner(f):
                # Break a reference cycle to speed GC.
                f = None
                self.run()
            self.io_loop.add_future(
                self.future, inner)
            return False
        return True
    
    def run(self):
        if self.running or self.finished:
            return
        try:
            self.running = True
            while True:
                future = self.future
                if not future.done():
                    return
                self.future = None
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    exc_info = None

                    try:
                        value = future.result()
                    except Exception:
                        self.had_exception = True
                        exc_info = sys.exc_info()
                    future = None
  
                    yielded = self.gen.send(value)

                except (StopIteration, Return) as e:
                    self.finished = True
                    self.future = _null_future
                    if self.pending_callbacks and not self.had_exception:
                        raise LeakedCallbackError(
                            "finished without waiting for callbacks %r" %
                            self.pending_callbacks)
                    future_set_result_unless_cancelled(self.result_future,
_value_from_stopiteration(e))
                    self.result_future = None
                    self._deactivate_stack_context()
                    return
                except Exception:
                    # 一些结束操作
                    return
                if not self.handle_yield(yielded):
                    return
                yielded = None
        finally:
            self.running = False