海运的博客

Python Curl/Pycurl添加DNS解析支持

发布时间:January 30, 2015 // 分类:Python // No Comments

Pycurl底层使用libcurl,请先确定Libcurl是否已支持异步DNS解析c-ares,如不支持可升级libcurl支持异步DNS解析c-ares
其实Libcurl更新支持为异步DNS如果已安装pycurl不用重新安装Pycurl,见https://www.haiyun.me/archives/1070.html
通过pip安装:

export PATH=/usr/local/curl/bin/:$PATH
export LD_LIBRARY_PATH="/usr/local/curl/lib/"
export PYCURL_SSL_LIBRARY=openssl
pip install pycurl

下载pycurl源码包安装:

wget http://pycurl.sourceforge.net/download/pycurl-7.19.5.1.tar.gz
tar zxvf pycurl-7.19.5.1.tar.gz 
cd pycurl-7.19.5.1
export LD_LIBRARY_PATH="/usr/local/curl/lib/"
python setup.py install --curl-config=/usr/local/curl/bin/curl-config --with-ssl

检查pycurl是否已支持异步DNS解析c-ares:

>>> import pycurl
>>> pycurl.version
'PycURL/7.19.5.1 libcurl/7.40.0 OpenSSL/1.0.1e zlib/1.2.7 c-ares/1.10.0'

安装pycurl后使用时遇到的一些错误:

pycurl.so: undefined symbol: CRYPTO_num_locks

原因:libcurl安装时--with-ssl支持

libcurl link-time ssl backend (openssl) is different from compile-time ssl backend (none/other)

原因:libcurl和pycurl编译时ssl后端不一致,调整见上和libcurl安装

pycurl: libcurl link-time version (7.19.7) is older than compile-time version (7.4.0)

原因:编译pycurl时使用的编译的libcurl动态库,不过现在pycurl现在加载的是系统自带的版本较旧的动态库,解决将编译的libcurl动态库添加到系统动态库,见ldconfig

Python异步非阻塞IO多路复用Select/Poll/Epoll使用

发布时间:January 12, 2015 // 分类:Python // No Comments

有许多封装好的异步非阻塞IO多路复用框架,底层在linux基于最新的epoll实现,为了更好的使用,了解其底层原理还是有必要的。
下面记录下分别基于Select/Poll/Epoll的echo server实现。
Python Select Server,可监控事件数量有限制:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import select
import socket
import Queue
 
server = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR  , 1)
server_address= ('192.168.1.5',8080)
server.bind(server_address)
server.listen(10)
 
#select轮询等待读socket集合
inputs = [server]
#select轮询等待写socket集合
outputs = []
message_queues = {}
#select超时时间
timeout = 20
 
while True:
    print "等待活动连接......"
    readable , writable , exceptional = select.select(inputs, outputs, inputs, timeout)
 
    if not (readable or writable or exceptional) :
        print "select超时无活动连接,重新select...... "
        continue;   
    #循环可读事件
    for s in readable :
        #如果是server监听的socket
        if s is server:
            #同意连接
            connection, client_address = s.accept()
            print "新连接: ", client_address
            connection.setblocking(0)
            #将连接加入到select可读事件队列
            inputs.append(connection)
            #新建连接为key的字典,写回读取到的消息
            message_queues[connection] = Queue.Queue()
        else:
            #不是本机监听就是客户端发来的消息
            data = s.recv(1024)
            if data :
                print "收到数据:" , data , "客户端:",s.getpeername()
                message_queues[s].put(data)
                if s not in outputs:
                    #将读取到的socket加入到可写事件队列
                    outputs.append(s)
            else:
                #空白消息,关闭连接
                print "关闭连接:", client_address
                if s in outputs :
                    outputs.remove(s)
                inputs.remove(s)
                s.close()
                del message_queues[s]
    for s in writable:
        try:
            msg = message_queues[s].get_nowait()
        except Queue.Empty:
            print "连接:" , s.getpeername() , '消息队列为空'
            outputs.remove(s)
        else:
            print "发送数据:" , msg , "到", s.getpeername()
            s.send(msg)
     
    for s in exceptional:
        print "异常连接:", s.getpeername()
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()
        del message_queues[s]

Python Poll Server,Select升级版,无可监控事件数量限制,还是要轮询所有事件:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket
import select
import Queue
 
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
server.bind(server_address)
server.listen(5)
print  "服务器启动成功,监听IP:" , server_address
message_queues = {}
#超时,毫秒
timeout = 5000
#监听哪些事件
READ_ONLY = ( select.POLLIN | select.POLLPRI | select.POLLHUP | select.POLLERR)
READ_WRITE = (READ_ONLY|select.POLLOUT)
#新建轮询事件对象
poller = select.poll()
#注册本机监听socket到等待可读事件事件集合
poller.register(server,READ_ONLY)
#文件描述符到socket映射
fd_to_socket = {server.fileno():server,}
while True:
    print "等待活动连接......"
    #轮询注册的事件集合
    events = poller.poll(timeout)
    if not events:
      print "poll超时,无活动连接,重新poll......"
      continue
    print "有" , len(events), "个新事件,开始处理......"
    for fd ,flag in events:
        s = fd_to_socket[fd]
        #可读事件
        if flag & (select.POLLIN | select.POLLPRI) :
            if s is server :
                #如果socket是监听的server代表有新连接
                connection , client_address = s.accept()
                print "新连接:" , client_address
                connection.setblocking(False)
                 
                fd_to_socket[connection.fileno()] = connection
                #加入到等待读事件集合
                poller.register(connection,READ_ONLY)
                message_queues[connection]  = Queue.Queue()
            else :
                #接收客户端发送的数据
                data = s.recv(1024)
                if data:
                    print "收到数据:" , data , "客户端:" , s.getpeername()
                    message_queues[s].put(data)
                    #修改读取到消息的连接到等待写事件集合
                    poller.modify(s,READ_WRITE)
                else :
                    # Close the connection
                    print "  closing" , s.getpeername()
                    # Stop listening for input on the connection
                    poller.unregister(s)
                    s.close()
                    del message_queues[s]
        #连接关闭事件
        elif flag & select.POLLHUP :
            print " Closing ", s.getpeername() ,"(HUP)"
            poller.unregister(s)
            s.close()
        #可写事件
        elif flag & select.POLLOUT :
            try:
                msg = message_queues[s].get_nowait()
            except Queue.Empty:
                print s.getpeername() , " queue empty"
                poller.modify(s,READ_ONLY)
            else :
                print "发送数据:" , data , "客户端:" , s.getpeername()
                s.send(msg)
        #异常事件
        elif flag & select.POLLERR:
            print "  exception on" , s.getpeername()
            poller.unregister(s)
            s.close()
            del message_queues[s]

Python Epoll Server,基于回调的事件通知模式,轻松管理大量连接:

#!/usr/bin/python
# -*- coding: utf-8 -*-
import socket, select
import Queue

serversocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
serversocket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server_address = ("192.168.1.5", 8080)
serversocket.bind(server_address)
serversocket.listen(1)
print  "服务器启动成功,监听IP:" , server_address
serversocket.setblocking(0)
timeout = 10
#新建epoll事件对象,后续要监控的事件添加到其中
epoll = select.epoll()
#添加服务器监听fd到等待读事件集合
epoll.register(serversocket.fileno(), select.EPOLLIN)
message_queues = {}

fd_to_socket = {serversocket.fileno():serversocket,}
while True:
  print "等待活动连接......"
  #轮询注册的事件集合
  events = epoll.poll(timeout)
  if not events:
     print "epoll超时无活动连接,重新轮询......"
     continue
  print "有" , len(events), "个新事件,开始处理......"
  for fd, event in events:
     socket = fd_to_socket[fd]
     #可读事件
     if event & select.EPOLLIN:
         #如果活动socket为服务器所监听,有新连接
         if socket == serversocket:
            connection, address = serversocket.accept()
            print "新连接:" , address
            connection.setblocking(0)
            #注册新连接fd到待读事件集合
            epoll.register(connection.fileno(), select.EPOLLIN)
            fd_to_socket[connection.fileno()] = connection
            message_queues[connection]  = Queue.Queue()
         #否则为客户端发送的数据
         else:
            data = socket.recv(1024)
            if data:
               print "收到数据:" , data , "客户端:" , socket.getpeername()
               message_queues[socket].put(data)
               #修改读取到消息的连接到等待写事件集合
               epoll.modify(fd, select.EPOLLOUT)
     #可写事件
     elif event & select.EPOLLOUT:
        try:
           msg = message_queues[socket].get_nowait()
        except Queue.Empty:
           print socket.getpeername() , " queue empty"
           epoll.modify(fd, select.EPOLLIN)
        else :
           print "发送数据:" , data , "客户端:" , socket.getpeername()
           socket.send(msg)
     #关闭事件
     elif event & select.EPOLLHUP:
        epoll.unregister(fd)
        fd_to_socket[fd].close()
        del fd_to_socket[fd]
epoll.unregister(serversocket.fileno())
epoll.close()
serversocket.close()

Python CURL异步并发HTTP客户端

发布时间:January 7, 2015 // 分类:Python // No Comments

Select模式,类似于php multi curl异步并发,连接数不能太多:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import pycurl
import cStringIO

#最大连接数
num_conn = 20

queue = []
urls = ['https://www.haiyun.me/'] * 10000
for url in urls:
  queue.append(url)

num_urls = len(queue)
num_conn = min(num_conn, num_urls)
print ('----- Getting', num_urls, 'Max conn', num_conn,
       'connections -----')

m = pycurl.CurlMulti()
#初始化handle,可复用
m.handles = []
for i in range(num_conn):
  c = pycurl.Curl()
  c.body = cStringIO.StringIO()
  c.setopt(pycurl.FOLLOWLOCATION, 1)
  c.setopt(pycurl.MAXREDIRS, 5)
  c.setopt(pycurl.CONNECTTIMEOUT, 30)
  c.setopt(pycurl.TIMEOUT, 300)
  c.setopt(pycurl.NOSIGNAL, 1)
  m.handles.append(c)


freelist = m.handles[:]
num_processed = 0
#主循环开始
while num_processed < num_urls:

    #添加请求URL
    while queue and freelist:
      url = queue.pop()
      c = freelist.pop()
      c.setopt(pycurl.URL, url)
      c.setopt(pycurl.WRITEFUNCTION, c.body.write)
      m.add_handle(c)
      c.url = url
      #print url

    #执行请求
    while 1:
      (ret, num_handles) = m.perform()
      if ret != pycurl.E_CALL_MULTI_PERFORM:
        break

    #阻塞一会直到有连接完成
    m.select(1.0)

    #读取完成的连接
    while 1:
      (num_q, ok_list, err_list) = m.info_read()
      for c in ok_list:
        m.remove_handle(c)
        #print c.body.getvalue()
        freelist.append(c)

      for (c, errno, errmsg) in err_list:
        m.remove_handle(c)
        print ('Failed: ', c.url, errno, errmsg)
        freelist.append(c)
      num_processed = num_processed + len(ok_list) + len(err_list)
      if num_q == 0:
        break

for c in m.handles:
  c.fp = None
  c.close()
m.close()

epoll模式,php mult curl不支持此模式,tornado基于pycurl multi_socket_action封装的异步http client,每个client实例维护一个ioloop:

from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
count = 10000
done = 0
def handle_request(response):
  global done
  done += 1
  if (done == count):
    #结束循环
    IOLoop.instance().stop()

  if response.error:
    print "Error:", response.error
  #else:
    #print response.body
#默认client是基于ioloop实现的,配置使用Pycurl
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient",max_clients=20)
http_client = AsyncHTTPClient()
for i in range(count):
  http_client.fetch("https://www.haiyun.me/", handle_request)
#死循环
IOLoop.instance().start()      

基于epoll的multi curl在lan环境下效果不如select,因为所有Socket都在活跃状态,所有的callback都被唤醒,会导致资源的竞争。既然都是要处理所有的Socket,直接遍历是最简单最有效的方式.
为更好的性能建议libcurl/pycurl开启异步DNS解析

分类
最新文章
最近回复
  • liyk: 这个方法获取的IPv6大概20分钟之后就会失效,默认路由先消失,然后Global IPV6再消失
  • 海运: 不好意思,没有。
  • zongboa: 您好,請問一下有immortalwrt設定guest Wi-Fi的GUI教學嗎?感謝您。
  • 海运: 恩山有很多。
  • swsend: 大佬可以分享一下固件吗,谢谢。
  • Jimmy: 方法一 nghtp3步骤需要改成如下才能编译成功: git clone https://git...
  • 海运: 地址格式和udpxy一样,udpxy和msd_lite能用这个就能用。
  • 1: 怎么用 编译后的程序在家里路由器内任意一台设备上运行就可以吗?比如笔记本电脑 m参数是笔记本的...
  • 孤狼: ups_status_set: seems that UPS [BK650M2-CH] is ...
  • 孤狼: 擦。。。。apcupsd会失联 nut在冲到到100的时候会ONBATT进入关机状态,我想想办...