海运的博客

Python CURL异步并发HTTP客户端

发布时间:January 7, 2015 // 分类:Python // No Comments

Select模式,类似于php multi curl异步并发,连接数不能太多:

#!/usr/bin/python
# -*- coding: utf-8 -*-

import sys
import pycurl
import cStringIO

#最大连接数
num_conn = 20

queue = []
urls = ['https://www.haiyun.me/'] * 10000
for url in urls:
  queue.append(url)

num_urls = len(queue)
num_conn = min(num_conn, num_urls)
print ('----- Getting', num_urls, 'Max conn', num_conn,
       'connections -----')

m = pycurl.CurlMulti()
#初始化handle,可复用
m.handles = []
for i in range(num_conn):
  c = pycurl.Curl()
  c.body = cStringIO.StringIO()
  c.setopt(pycurl.FOLLOWLOCATION, 1)
  c.setopt(pycurl.MAXREDIRS, 5)
  c.setopt(pycurl.CONNECTTIMEOUT, 30)
  c.setopt(pycurl.TIMEOUT, 300)
  c.setopt(pycurl.NOSIGNAL, 1)
  m.handles.append(c)


freelist = m.handles[:]
num_processed = 0
#主循环开始
while num_processed < num_urls:

    #添加请求URL
    while queue and freelist:
      url = queue.pop()
      c = freelist.pop()
      c.setopt(pycurl.URL, url)
      c.setopt(pycurl.WRITEFUNCTION, c.body.write)
      m.add_handle(c)
      c.url = url
      #print url

    #执行请求
    while 1:
      (ret, num_handles) = m.perform()
      if ret != pycurl.E_CALL_MULTI_PERFORM:
        break

    #阻塞一会直到有连接完成
    m.select(1.0)

    #读取完成的连接
    while 1:
      (num_q, ok_list, err_list) = m.info_read()
      for c in ok_list:
        m.remove_handle(c)
        #print c.body.getvalue()
        freelist.append(c)

      for (c, errno, errmsg) in err_list:
        m.remove_handle(c)
        print ('Failed: ', c.url, errno, errmsg)
        freelist.append(c)
      num_processed = num_processed + len(ok_list) + len(err_list)
      if num_q == 0:
        break

for c in m.handles:
  c.fp = None
  c.close()
m.close()

epoll模式,php mult curl不支持此模式,tornado基于pycurl multi_socket_action封装的异步http client,每个client实例维护一个ioloop:

from tornado.httpclient import AsyncHTTPClient
from tornado.ioloop import IOLoop
count = 10000
done = 0
def handle_request(response):
  global done
  done += 1
  if (done == count):
    #结束循环
    IOLoop.instance().stop()

  if response.error:
    print "Error:", response.error
  #else:
    #print response.body
#默认client是基于ioloop实现的,配置使用Pycurl
AsyncHTTPClient.configure("tornado.curl_httpclient.CurlAsyncHTTPClient",max_clients=20)
http_client = AsyncHTTPClient()
for i in range(count):
  http_client.fetch("https://www.haiyun.me/", handle_request)
#死循环
IOLoop.instance().start()      

基于epoll的multi curl在lan环境下效果不如select,因为所有Socket都在活跃状态,所有的callback都被唤醒,会导致资源的竞争。既然都是要处理所有的Socket,直接遍历是最简单最有效的方式.
为更好的性能建议libcurl/pycurl开启异步DNS解析

GO HTTP client客户端使用

发布时间:January 6, 2015 // 分类:GO // No Comments

简单:

package main

import (
    "fmt"
    "io/ioutil"
    "net/http"
    "strings"
)

func main() {
    res, err := http.Get("https://www.haiyun.me/")
    if err != nil {
        return
    }
        //执行close之前一定要判断错误,如没有body会崩溃
    defer res.Body.Close()
        //重用连接一定要执行上和下两步
    body, _ := ioutil.ReadAll(res.Body)
    fmt.Println(string(body))
        fmt.Println(res.Status)
    for k, v := range res.Header {
        fmt.Println(k, strings.Join(v, ""))
    }

}

自定义client,连接读写超时及自定义head:

package main

import (
        "fmt"
        "io/ioutil"
        "net/http"
        "strings"
        "time"
)

var timeout = time.Duration(20 * time.Second)

func dialTimeout(network, addr string) (net.Conn, error) {
        return net.DialTimeout(network, addr, timeout)
}

func main() {
        tr := &http.Transport{
                //使用带超时的连接函数
                Dial: dialTimeout,
                //建立连接后读超时
                ResponseHeaderTimeout: time.Second * 2,
                //直接通过ip访问时设置sni
                TLSClientConfig: &tls.Config{
                     ServerName: "www.haiyun.me",
                },
        }
        client := &http.Client{
                Transport: tr,
                //总超时,包含连接读写
                Timeout: timeout,
        }
        req, _ := http.NewRequest("GET", "https://www.haiyun.me", nil)
        req.Header.Set("Connection", "keep-alive")
        req.Header.Set("Host", "www.haiyun.me")
        res, err := client.Do(req)
        if err != nil {
                return
        }
        defer res.Body.Close()
        body, _ := ioutil.ReadAll(res.Body)
        fmt.Println(string(body))
        for k, v := range res.Header {
                fmt.Println(k, strings.Join(v, ""))
        }

}

使用代理及指定出口IP:

        //使用HTTP PROXY
        proxyUrl, err := url.Parse("http://host:port")
        tr := &http.Transport{
                Proxy: http.ProxyURL(proxyUrl),
        }
        //指定出口IP
        ief, err := net.InterfaceByName("eth0")
        addrs, err := ief.Addrs()
        addr := &net.TCPAddr{
                IP: addrs[0].(*net.IPNet).IP,
        }
        dia := net.Dialer{LocalAddr: addr}
        tr := &http.Transport{
                Dial: dia.Dial,
        }

使用socks5代理:https://github.com/hailiang/socks
http://golang.org/pkg/net/http/#Client
http://golang.org/pkg/net/http/#Transport
http://golang.org/pkg/net/#Dialer

PHP CURL Keepalive连接重用

发布时间:January 4, 2015 // 分类:PHP // No Comments

PHP CURL默认支持keepalive连接复用,单个CURL注意要复用handle才可以:

<?php
   $ch1 = curl_init();
   curl_setopt($ch1, CURLOPT_URL, "https://www.haiyun.me/");
   curl_setopt($ch1, CURLOPT_HTTPHEADER, array(
    'Connection: Keep-Alive',
    'Keep-Alive: 300'
   ));
   curl_setopt($ch1, CURLOPT_FORBID_REUSE, 0);
   curl_setopt($ch1, CURLOPT_RETURNTRANSFER,1);
   curl_exec($ch1);

   curl_setopt($ch1, CURLOPT_URL, "https://www.haiyun.me/");
   curl_setopt($ch1, CURLOPT_RETURNTRANSFER,1);
   curl_exec($ch1);
   curl_close($ch1);

Multi_CURL无需复用handle即默认支持keepalive连接复用,当然也可复用handle,详情见:自用完美php异步并行 multi curl类

$master = curl_multi_init();
$done = curl_multi_info_read($master)
#删除handle
curl_multi_remove_handle($master, $done['handle']);
#复用删除的handle
curl_multi_add_handle($master, $ch);

GO协程同步及限制协程数

发布时间:January 3, 2015 // 分类:GO // No Comments

使用同步锁同步:

package main

import "fmt"
import "time"
import "sync"

func main() {
        var wg sync.WaitGroup
        //生成带缓存的chan
        var limit = make(chan struct{}, 10)

        for i := 0; i < 100; i++ {
                //写入chan,直到缓存写满阻塞,即限制协程数
                limit <- struct{}{}
                //同步计数加1
                wg.Add(1)
                go func(i int) {
                        time.Sleep(1000 * time.Millisecond)
                         fmt.Println(i)
                        //协程内任务完成读取chan,主协程可继续写入chan
                        <-limit
                        //同步计数减1
                        defer wg.Done()
                }(i)
        }
        //同步阻塞直到计数为0,即所有协程完成
        wg.Wait()
}

使用channel同步:

package main

import "fmt"
import "time"

var sync chan int;
func foo(i int) {
        fmt.Println(i)
        time.Sleep(time.Second) 
        sync <- 0 
}

func main() {
        count := 100
        sync = make(chan int, count) 

        for i := 0; i < count; i++ {
                go foo(i)
        }

        //等待所有协程完成
        for i := 0; i < count; i++ {
                <-sync
        }
}
分类
最新文章
最近回复
  • 海运: 恩山有很多。
  • swsend: 大佬可以分享一下固件吗,谢谢。
  • Jimmy: 方法一 nghtp3步骤需要改成如下才能编译成功: git clone https://git...
  • 海运: 地址格式和udpxy一样,udpxy和msd_lite能用这个就能用。
  • 1: 怎么用 编译后的程序在家里路由器内任意一台设备上运行就可以吗?比如笔记本电脑 m参数是笔记本的...
  • 孤狼: ups_status_set: seems that UPS [BK650M2-CH] is ...
  • 孤狼: 擦。。。。apcupsd会失联 nut在冲到到100的时候会ONBATT进入关机状态,我想想办...
  • 海运: 网络,找到相应的url编辑重发请求,firefox有此功能,其它未知。
  • knetxp: 用浏览器F12网络拦截或监听后编辑重发请求,修改url中的set为set_super,将POS...
  • Albert: 啊啊啊啊啊啊啊啊啊 我太激动了,终于好了英文区搜索了半天,翻遍了 pve 论坛没找到好方法,博...