博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
tornado和subprocess实现程序的非堵塞异步处理
阅读量:7124 次
发布时间:2019-06-28

本文共 4743 字,大约阅读时间需要 15 分钟。

ornado     是由Facebook开源的一个服务器“套装",适合于做python的web或者使用其本身提供的可扩展的功能,完成了不完整的wsgi协议,可用于做快速的web开发,封装了epoll性能较好。文章主要以分析tornado的网络部分即异步事件处理与上层的IOstream类提供的异步IO,其他的模块如web的tornado.web 以后慢慢留作分析。


源码组织:

  |---__init__.py

   ---auth.py

   ---......

   ---epoll.c

   ---ioloop.py

   ---iostream.py

   ---...

  tornado网络部分最核心的两个模块就是ioloop.py与iostream.py,我们主要分析的就是这两个部分。

  ioloop.py 主要的是将底层的epoll或者说是其他的IO多路复用封装作异步事件来处理

  iostream.py主要是对于下层的异步事件的进一步封装,为其封装了更上一层的buffer(IO)事件.


这段时间一直在学习tornado的 异步的处理。主要是用ioloop实现多路复用。

下面是个例子,有tornado基础的朋友,一看就懂的~


1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
import 
subprocess
import 
tornado.ioloop
import 
time
import 
fcntl
import 
functools
import 
os
class 
GenericSubprocess (object):
    
def __init__ ( self, timeout=-
1
, **popen_args ):
        
self.args = dict()
        
self.args[
"stdout"
] = subprocess.PIPE
        
self.args[
"stderr"
] = subprocess.PIPE
        
self.args[
"close_fds"
] = True
        
self.args.update(popen_args)
        
self.ioloop = None
        
self.expiration = None
        
self.pipe = None
        
self.timeout = timeout
        
self.streams = []
        
self.has_timed_out = False
    
def start(self):
        
""
"Spawn the task.
        
Throws RuntimeError 
if 
the task was already started.
""
"
        
if 
not self.pipe 
is 
None:
            
raise RuntimeError(
"Cannot start task twice"
)
        
self.ioloop = tornado.ioloop.IOLoop.instance()
        
if 
self.timeout > 
0
:
            
self.expiration = self.ioloop.add_timeout( time.time() + self.timeout, self.on_timeout )
        
self.pipe = subprocess.Popen(**self.args)
        
self.streams = [ (self.pipe.stdout.fileno(), []),
                             
(self.pipe.stderr.fileno(), []) ]
        
for 
fd, d 
in 
self.streams:
            
flags = fcntl.fcntl(fd, fcntl.F_GETFL)| os.O_NDELAY
            
fcntl.fcntl( fd, fcntl.F_SETFL, flags)
            
self.ioloop.add_handler( fd,
                                     
self.stat,
                                     
self.ioloop.READ|self.ioloop.ERROR)
    
def on_timeout(self):
        
self.has_timed_out = True
        
self.cancel()
    
def cancel (self ) :
        
""
"Cancel task execution
        
Sends SIGKILL to the child process.
""
"
        
try
:
            
self.pipe.kill()
        
except:
            
pass
    
def stat( self, *args ):
        
''
'Check process completion and consume pending I/O data'
''
        
self.pipe.poll()
        
if 
not self.pipe.returncode 
is 
None:
            
''
'cleanup handlers and timeouts'
''
            
if 
not self.expiration 
is 
None:
                
self.ioloop.remove_timeout(self.expiration)
            
for 
fd, dest 
in  
self.streams:
                
self.ioloop.remove_handler(fd)
            
''
'schedulle callback (first try to read all pending data)'
''
            
self.ioloop.add_callback(self.on_finish)
        
for 
fd, dest 
in  
self.streams:
            
while 
True:
                
try
:
                    
data = os.read(fd, 
4096
)
                    
if 
len(data) == 
0
:
                        
break
                    
dest.extend([data])
                
except:
                    
break
    
@property
    
def stdout(self):
        
return 
self.get_output(
0
)
    
@property
    
def stderr(self):
        
return 
self.get_output(
1
)
    
@property
    
def status(self):
        
return 
self.pipe.returncode
    
def get_output(self, index ):
        
return 
""
.join(self.streams[index][
1
])
    
def on_finish(self):
        
raise NotImplemented()
class 
Subprocess (GenericSubprocess):
    
def __init__ ( self, callback, *args, **kwargs):
        
self.callback = callback
        
self.done_callback = False
        
GenericSubprocess.__init__(self, *args, **kwargs)
    
def on_finish(self):
        
if 
not self.done_callback:
            
self.done_callback = True
            
''
'prevent calling callback twice'
''
            
self.ioloop.add_callback(functools.partial(self.callback, self.status, self.stdout, self.stderr, self.has_timed_out))
if 
__name__ == 
"__main__"
:
    
ioloop = tornado.ioloop.IOLoop.instance()
    
def print_timeout( status, stdout, stderr, has_timed_out) :
        
assert(status!=
0
)
        
assert(has_timed_out)
        
print 
"OK status:"
, repr(status), 
"stdout:"
, repr(stdout), 
"stderr:"
, repr(stderr), 
"timeout:"
, repr(has_timed_out)
    
def print_ok( status, stdout, stderr, has_timed_out) :
        
assert(status==
0
)
        
assert(not has_timed_out)
        
print 
"OK status:"
, repr(status), 
"stdout:"
, repr(stdout), 
"stderr:"
, repr(stderr), 
"timeout:"
, repr(has_timed_out)
    
def print_error( status, stdout, stderr, has_timed_out):
        
assert(status!=
0
)
        
assert(not has_timed_out)
        
print 
"OK status:"
, repr(status), 
"stdout:"
, repr(stdout), 
"stderr:"
, repr(stderr), 
"timeout:"
, repr(has_timed_out)
    
def stop_test():
        
ioloop.stop()
    
t1 = Subprocess( print_timeout, timeout=
3
, args=[ 
"sleep"
,
"5"
] )
    
t2 = Subprocess( print_ok, timeout=
3
, args=[ 
"ip"
"a" 
] )
    
t3 = Subprocess( print_ok, timeout=
3
, args=[ 
"sleepdsdasdas"
"1" 
] )
    
t4 = Subprocess( print_error, timeout=
3
, args=[ 
"cat"
"/etc/sdfsdfsdfsdfsdfsdfsdf" 
] )
    
t1.start()
    
t2.start()
    
try
:
        
t3.start()
        
assert(
false
)
    
except:
        
print 
"OK"
    
t4.start()
    
ioloop.add_timeout(time.time() + 
10
, stop_test)
    
ioloop.start()
 本文转自 rfyiamcool 51CTO博客,原文链接:http://blog.51cto.com/rfyiamcool/1236330,如需转载请自行联系原作者
你可能感兴趣的文章
device's media capture mechanism,利用input:file调用设备的照相机/相册、摄像机、录音机...
查看>>
BroadLink:三款新品力求无障碍人机交互,三大平台分三期对外开放 ...
查看>>
掌门1对1获3.5亿美元E-1轮融资,华人文化产业基金、中金甲子基金等投资 ...
查看>>
Unity中的通用对象池
查看>>
ORA-00600: internal error code, arguments: [16703], [1403], [28], [...
查看>>
忆芯科技发布新一代国产主控芯片STAR1000P!4月完成量产版本 ...
查看>>
如何用条码标签打印软件实现商品价签制定会员价 ...
查看>>
如何轻松实现个性化推荐系统
查看>>
Mysql高级查询 内连接和外连接详解
查看>>
基于AWS的电子商务网站架构——Web前端
查看>>
基于险企传统资源优势的“一核三环”规划——互联网平台建设
查看>>
社交网络:有意义的不仅是邓巴数
查看>>
MySQL优化案例
查看>>
02 贝叶斯算法 - 案例一 - 鸢尾花数据分类
查看>>
场景数据互为表里!畅想2027,保险行业发展愿景
查看>>
hibernate4整合spring3出现java.lang.NoClassDefFoundError: [Lorg/hibernate/engine/FilterDefinition;...
查看>>
港科大教授权龙:三维视觉重新定义人工智能安防
查看>>
数据库巡检项
查看>>
通过阿里云APP,可以进行ECS,RDS 等实例的管理
查看>>
HBase-Region太多的问题简单总结
查看>>