开发,设计,生活

网络爬虫系统

  • 2014-06-24 05:00:37 2014-06-24 09:59:11[U] 网络爬虫系统

    Deferreds 异步回调序列

    Deferred 本质上是一个回调函数的集合,twisted 提供了对函数延迟调用的机制。

    在 twisted 内部,使用 Deferred 对象管理回调序列。当异步请求结果返回时,使用 Deferred 对象调用回调序列中的函数。

    这里通过几个例子来了解 Deferreds 使用的方式和工作的原理。


    先看一个简单的例子

    from twisted.internet import reactor, defer
    
    def getDummyData(x):
        """
        创建一个 Deferred 对象,并返回这个对象
        """
        d = defer.Deferred()
        # 2 秒钟后执行 Deferred 回调函数序列,把 x * 3 作为参数传递给回调序列中的第一个函数
        # reactor.callLater 是一个定时延迟调用的方法
        reactor.callLater(2, d.callback, x * 3)
        return d
    
    def printData(result):
        """
        打印结果
        """
        print result
    
    d = getDummyData(3)
    
    # 添加回调函数到回调函数序列中
    d.addCallback(printData)
    
    # 4 秒钟后停止 reactor 循环(退出进程)
    reactor.callLater(4, reactor.stop)
    
    # 开始 Twisted reactor 事件循环
    reactor.run()
    

    使用 Deferred 的方式:

    1. d = defer.Deferred(), 创建 Deferred 对象。
    2. d.addCallback(printData), 添加回调函数,可以添加多个函数到回调序列中。
    3. d.callback(result),回调序列开始执行(使用result作为参数)。


    在 Deferred 中,d.addCallbacks/a.addBoth 等函数都可以添加回调函数到序列中。还可以添加 errback(错误回调函数)在回调函数出错时进行处理。


    Deferred 的内部机制

    虽然调用 addCallback 的时候只传入参数 callback, 但是回调序列中 callback/errback 总是成对出现的。twisted 内部的源代码如下。

    def addCallbacks(self, callback, errback=None,
                          callbackArgs=None, callbackKeywords=None,
                          errbackArgs=None, errbackKeywords=None):
             """
             Add a pair of callbacks (success and error) to this L{Deferred}.
             These will be executed when the 'master' callback is run.
             @return: C{self}.
             @rtype: a L{Deferred}
             """
             assert callable(callback)
             assert errback == None or callable(errback)
    
             # 主要是这里成对添加 callback/errback
             cbs = ((callback, callbackArgs, callbackKeywords),
                    (errback or (passthru), errbackArgs, errbackKeywords))
             self.callbacks.append(cbs)
     
             if self.called:
                 self._runCallbacks()
             return self
    


    当回调序列中的函数发生异常,或者返回twisted.python.failure.Failure 实例时, 下一个callback/errback回调函数对中的错误回调函数会被调用。

    错误回调函数的例子:

    #!/usr/bin/env python
    # coding: utf-8
    
    from twisted.internet import defer, reactor
    from pprint import pprint
    
    def cback1(result):
        raise Exception('cback1')
    
    def cerr1(failure):
        print 'cerr1: %s' % str(failure)
    
    def cback2(result):
        return 'succ cback2'
    
    def cerr2(failure):
        print 'cerr2: %s' % str(failure)
    
    
    d = defer.Deferred()
    d.addCallbacks(callback=cback1, errback=cerr1)
    d.addCallbacks(callback=cback2, errback=cerr2)
    d.addCallback(callback=lambda ign:pprint('cback3'))
    
    # 执行回调函数序列
    d.callback('run')
    
    # 退出事件循环
    reactor.callLater(2, reactor.stop)
    reactor.run()
    

    运行结果

    user@host:~/test $ python testErr.py
    cerr2: [Failure instance: Traceback: <type 'exceptions.Exception'>: cback1
    testErr.py:26:<module>
    /usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:382:callback
    /usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:490:_startRunCallbacks
    --- <exception caught here> ---
    /usr/local/lib/python2.7/dist-packages/twisted/internet/defer.py:577:_runCallbacks
    testErr.py:8:cback1
    ]
    'cback3'
    

    从例子中看运行机制:

    1. 在回调序列执行过程中,当前回调函数的返回值就是下一个回调函数的第一个参数,d.callback('run') 使用字符串'run'作为第一个回调函数的参数。
    2. 当回调函数 cback1 出现异常,下一个错误处理函数 cerr2 被调用。
    3. 当 cerr2 返回后,如果返回值不是 twisted.python.failure.Failure, 调用将转回到下个 callback ( cback3 )函数,如果返回值是 Failure 对象,则继续调用下一个的错误处理函数。


    Deferred 结构图


    Deferred 调用图


    Deferred 回调函数的参数传递和回调函数内异步操作方法

    添加到 Deferred 中的回调函数可以传入参数,并且在回调函数内可以执行异步操作(异步网络请求)。

    下面是一个从 京东 获取书籍信息的例子: 先从搜索的列表页获取一个书籍信息,再从详情页获取详细信息。

    #!/usr/bin/env python
    # coding: utf-8
    
    import sys
    import bs4 as bs
    
    from twisted.web.client import getPage
    from twisted.internet import reactor
    from twisted.python.util import println
    from twisted.internet import defer
    
    class Goods(object):
        '''商品信息分析类'''
        _urlFmt='http://search.jd.com/search?keyword=%s&enc=utf-8&rt=1&book=y&area=1&wtype=1'
    
        _name=''
        _id=''
        _price=0
        _detail=''
        _detailUrl=''
    
        def __init__(self, keyword):
            self._url = self._urlFmt % keyword
    
        def getGoodsUrl(self):
            return self._url
    
        def getGoodsDetailUrl(self):
            return self._detailUrl
    
        def getGoods(self, page):
            soup = bs.BeautifulSoup(page)
    
            # 获取商品名称,详情页url
            infos = soup.find_all(class_='info')
            info = infos[0].select('dt.p-name > a')[0]
            self._name = info.get_text().strip()
            self._detailUrl = info['href']
    
        def getGoodsDetail(self, page):
            #print page
            soup = bs.BeautifulSoup(page)
    
            # 获取价格
            price = soup.select('#summary-price strong')[0]
            self._price = price.get_text().strip()
    
            # 获取详细信息
            details = soup.select('#product-detail-1')[0].strings
            self._detail = ''.join(details)
            return True
    
        def printResult(self, IsPrint=True):
            if IsPrint:
                print self._name
                print self._price
                print self._detailUrl
                print ' ' * 10, self._detail
    
    def main():
        errback=lambda error:(println("an error occurred", error),reactor.stop())
        def goodsCallback(page, goods):
            ''''''
            #print page
            goods.getGoods(page)
            d2 = getPage(goods.getGoodsDetailUrl())
            d2.addCallbacks(callback=goods.getGoodsDetail, errback=errback)
            return d2
    
        # 根据关键词创建商品信息对象
        goods = Goods(sys.argv[1])
    
        # 异步查询商品信息
        d1 = getPage(goods.getGoodsUrl())
    
        # 添加获取商品信息的回调函数到 defer 对象
        d1.addCallbacks(callback=goodsCallback, errback=errback, callbackArgs=[goods])
        d1.addCallbacks(callback=lambda successed:(goods.printResult(successed)), errback=errback)
    
        # 结束主事件监控循环
        d1.addCallbacks(callback=lambda ign: reactor.stop(), errback=errback)
    
        reactor.run()
    
    
    if __name__ == '__main__':
        main()
    

    运行例子程序:

    python goodsJD.py 机器学习
    

    例子中 Deferred 对象的使用:

    1. d.addCallbacks 的 callbackArgs 参数接收一个 list 作为添加的回调函数的参数,errbackArgs 也是一个 list, 作为添加的错误处理函数的参数。
    2. 在回调序列中的执行过程中,如果回调函数返回了一个 Deferred 对象,例如上面的 goodsCallback 函数。在新返回的 d2 回调序列被执行完之前,d1的调用序列会暂停执行,在 d2 被执行完之后,d1 回调序列才会继续执行。(这个功能对于嵌套的异步请求很重要,例如我有三个请求A,B,C, 这三个请求有依赖关系,必须根据 A 的响应发送 B 请求,根据 B 的响应发送 C 请求)


    DeferredList 对象的使用

    有时候需要等待几个异步事件完成,然后再执行某些任务。这时候 DeferredList 就可以发挥作用了。

    看官网的一个例子,我修改并注释一些代码

    #!/usr/bin/env python
    # coding: utf-8
    
    from twisted.internet import defer
    
    
    # DeferredList 回调函数,参数是它包含的 Deferred 执行后的结果组成的 list
    def printResult(result):
        for (success, value) in result:
            if success:
                print 'Success:', value
            else:
                print 'Failure:', value.getErrorMessage()
    
    # deferred1 的回调函数
    def printDeferred(result):
        print 'deferred1: %s' % result
        return 'deferred1: %s' % result
    
    # 创建三个新的 Deferred
    deferred1 = defer.Deferred()
    deferred2 = defer.Deferred()
    deferred3 = defer.Deferred()
    
    # 打包三个 Deferred 对象到 DeferredList
    dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True)
    
    # 添加回调函数到 DeferredList,
    # 当它包含的 Deferred 都执行完后,将执行 DeferredList 这个调用序列
    dl.addCallback(printResult)
    
    # 注意这个回调函数添加的位置,
    # 因为在 defer.DeferredList 后面,所以这个回调将不影响 DeferredList 结果
    # 只有添加在 defer.DeferredList 之前,结果才会传递给 defer.DeferredList 的回调序列
    deferred1.addCallback(printDeferred)
    
    # 调用三个回调序列
    deferred1.callback('one')
    deferred2.errback(Exception('bang!'))
    deferred3.callback('three')
    

    运行结果

    user@host ~/test/twisted $ python testDeferredList.py
    deferred1: one
    Success: one
    Failure: bang!
    Success: three
    

    DeferredList 运行需要注意的地方

    1. DeferredList 打包的回调序列 deferred1,deferred2,deferred3,这几个序列中的回调函数要在打包之前(创建DeferredList对象之前)添加。(我例子中添加的位置是不正确的!!!)
    2. DeferredList 的回调函数接收的参数是个 list, list 中的每个元素是 (succ, result),表示 deferred1,deferred2,deferred3 执行成功的标志和最后结果。