本文共 4743 字,大约阅读时间需要 15 分钟。
ornado 是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。
源码组织:
|---__init__.py
---auth.py
---......
---epoll.c
---ioloop.py
---iostream.py
---...
tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。
ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理
iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.
这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。
下面是个例子,有tornado基础的朋友,一看就懂的~
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 | import subprocess import tornado.ioloop import time import fcntl import functools import os class GenericSubprocess (object): def __init__ ( self, timeout=- 1 , **popen_args ): self.args = dict() self.args[ "stdout" ] = subprocess.PIPE self.args[ "stderr" ] = subprocess.PIPE self.args[ "close_fds" ] = True self.args.update(popen_args) self.ioloop = None self.expiration = None self.pipe = None self.timeout = timeout self.streams = [] self.has_timed_out = False def start(self): "" "Spawn the task. Throws RuntimeError if the task was already started. "" " if not self.pipe is None: raise RuntimeError( "Cannot start task twice" ) self.ioloop = tornado.ioloop.IOLoop.instance() if self.timeout > 0 : self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout ) self.pipe = subprocess.Popen(**self.args) self.streams = [ (self.pipe.stdout.fileno(), []), (self.pipe.stderr.fileno(), []) ] for fd, d in self.streams: flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY fcntl.fcntl( fd, fcntl.F_SETFL, flags) self.ioloop.add_handler( fd, self.stat, self.ioloop.READ|self.ioloop.ERROR) def on_timeout(self): self.has_timed_out = True self.cancel() def cancel (self ) : "" "Cancel task execution Sends SIGKILL to the child process. "" " try : self.pipe.kill() except: pass def stat( self, *args ): '' 'Check process completion and consume pending I/O data' '' self.pipe.poll() if not self.pipe.returncode is None: '' 'cleanup handlers and timeouts' '' if not self.expiration is None: self.ioloop.remove_timeout(self.expiration) for fd, dest in self.streams: self.ioloop.remove_handler(fd) '' 'schedulle callback (first try to read all pending data)' '' self.ioloop.add_callback(self.on_finish) for fd, dest in self.streams: while True: try : data = os.read(fd, 4096 ) if len(data) == 0 : break dest.extend([data]) except: break @property def stdout(self): return self.get_output( 0 ) @property def stderr(self): return self.get_output( 1 ) @property def status(self): return self.pipe.returncode def get_output(self, index ): return "" .join(self.streams[index][ 1 ]) def on_finish(self): raise NotImplemented() class Subprocess (GenericSubprocess): def __init__ ( self, callback, *args, **kwargs): self.callback = callback self.done_callback = False GenericSubprocess.__init__(self, *args, **kwargs) def on_finish(self): if not self.done_callback: self.done_callback = True '' 'prevent calling callback twice' '' self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out)) if __name__ == "__main__" : ioloop = tornado.ioloop.IOLoop.instance() def print_timeout( status, stdout, stderr, has_timed_out) : assert(status!= 0 ) assert(has_timed_out) print "OK status:" , repr(status), "stdout:" , repr(stdout), "stderr:" , repr(stderr), "timeout:" , repr(has_timed_out) def print_ok( status, stdout, stderr, has_timed_out) : assert(status== 0 ) assert(not has_timed_out) print "OK status:" , repr(status), "stdout:" , repr(stdout), "stderr:" , repr(stderr), "timeout:" , repr(has_timed_out) def print_error( status, stdout, stderr, has_timed_out): assert(status!= 0 ) assert(not has_timed_out) print "OK status:" , repr(status), "stdout:" , repr(stdout), "stderr:" , repr(stderr), "timeout:" , repr(has_timed_out) def stop_test(): ioloop.stop() t1 = Subprocess( print_timeout, timeout= 3 , args=[ "sleep" , "5" ] ) t2 = Subprocess( print_ok, timeout= 3 , args=[ "ip" , "a" ] ) t3 = Subprocess( print_ok, timeout= 3 , args=[ "sleepdsdasdas" , "1" ] ) t4 = Subprocess( print_error, timeout= 3 , args=[ "cat" , "/etc/sdfsdfsdfsdfsdfsdfsdf" ] ) t1.start() t2.start() try : t3.start() assert( false ) except: print "OK" t4.start() ioloop.add_timeout(time.time() + 10 , stop_test) ioloop.start() |