开发,设计,生活

网络爬虫系统

  • 2014-08-18 13:46:58 2014-08-22 04:37:21[U] 网络爬虫系统


    使用 libtorrent 的python绑定库实现一个dht网络爬虫,抓取dht网络中的磁力链接。


    dht 网络简介

    p2p网络

    在P2P网络中,通过种子文件下载资源时,要知道资源在P2P网络中哪些计算机中,这些传输资源的计算机称作peer。在传统的P2P网络中,使用tracker服务器跟踪资源的peer。要下载资源,首先需要取得这些peer。


    dht网络

    tracker服务器面临一些版权和法律问题。于是出现了DHT,它把tracker上的资源peer信息分散到了整个网络中。dht网络是由分布式节点构成,节点(node)是实现了DHT协议的p2p客户端。P2P客户端程序既是peer也是node。DHT网络有多种算法,常用的有 Kademlia。


    dht网络下载

    P2P客户端使用种子文件下载资源时,如果没有tracker服务器,它就向DHT网络查询资源的peer列表, 然后从peer下载资源。


    Magnet是磁力链接

    资源的标识在DHT网络中称为infohash,是一个通过sha1算法得到的20字节长的字符串。infohash是使用种子文件的文件描述信息计算得到。磁力链接是把infohash编码成16进制字符串得到。P2P客户端使用磁力链接,下载资源的种子文件,然后根据种子文件下载资源。


    Kademlia 算法

    Kademlia是DHT网络的一种实现, 具体的算法参见:DHT协议


    KRPC 协议

    KRPC 是节点之间的交互协议,使用UDP来传送。

    包括4种请求:ping,find_node,get_peer,announce_peer。其中get_peer和announce_peer是节点间查询资源的主要消息。


    dht 爬虫原理

    主要的思路就是伪装为p2p客户端,加入dht网络,收集dht网络中的get_peer和announce_peer消息,这些消息是其他node发送给伪装的p2p客户端的udp消息。


    本文dht爬虫的实现

    爬虫运行环境

    1. linux 系统
    2. python 2.7
    3. libtorrent 库的python绑定
    4. twisted 网络库
    5. 防火墙开启固定的udp和tcp端口


    libtorrent 库的介绍

    libtorrent库是p2p下载的客户端库,有丰富的接口,可以用来开发下载p2p网络上的资源。它有python的绑定库,本爬虫就是使用它的python库开发的。

    在libtorrent中有几个概念需要解释下。 session 相当于p2p客户端,session开启一个tcp和一个udp端口,用来与其他p2p客户端交换数据。可以在一个进程内定义多个session,也就是多个p2p客户端,来加快收集速度。

    alert是libtorrent中用来收集各种消息的队列,每个session都有一个自己的alert消息队列。KRPC协议的get_peer和announce_peer消息也是从这个队列中获取,就是用这两个消息收集磁力链接的。


    主要实现代码

    爬虫实现的主要代码比较简单

    # 事件通知处理函数
        def _handle_alerts(self, session, alerts):
            while len(alerts):
                alert = alerts.pop()
                # 获取dht_announce_alert和dht_get_peer_alert消息
                # 从这两消息收集磁力链接
                if isinstance(alert, lt.add_torrent_alert):
                    alert.handle.set_upload_limit(self._torrent_upload_limit)
                    alert.handle.set_download_limit(self._torrent_download_limit)
                elif isinstance(alert, lt.dht_announce_alert):
                    info_hash = alert.info_hash.to_string().encode('hex')
                    if info_hash in self._meta_list:
                        self._meta_list[info_hash] += 1
                    else:
                        self._meta_list[info_hash] = 1
                        self._current_meta_count += 1
                elif isinstance(alert, lt.dht_get_peers_alert):
                    info_hash = alert.info_hash.to_string().encode('hex')
                    if info_hash in self._meta_list:
                        self._meta_list[info_hash] += 1
                    else:
                        self._infohash_queue_from_getpeers.append(info_hash)
                        self._meta_list[info_hash] = 1
                        self._current_meta_count += 1
    
        def start_work(self):
            '''主工作循环,检查消息,显示状态'''
            # 清理屏幕
            begin_time = time.time()
            show_interval = self._delay_interval
            while True:
                for session in self._sessions:
                    session.post_torrent_updates()
                    # 从队列中获取信息
                    self._handle_alerts(session, session.pop_alerts())
                time.sleep(self._sleep_time)
                if show_interval > 0:
                    show_interval -= 1
                    continue
                show_interval = self._delay_interval
    
                # 统计信息显示
                show_content = ['torrents:']
                interval = time.time() - begin_time
                show_content.append('  pid: %s' % os.getpid())
                show_content.append('  time: %s' %
                                    time.strftime('%Y-%m-%d %H:%M:%S'))
                show_content.append('  run time: %s' % self._get_runtime(interval))
                show_content.append('  start port: %d' % self._start_port)
                show_content.append('  collect session num: %d' %
                                    len(self._sessions))
                show_content.append('  info hash nums from get peers: %d' %
                                    len(self._infohash_queue_from_getpeers))
                show_content.append('  torrent collection rate: %f /minute' %
                                    (self._current_meta_count * 60 / interval))
                show_content.append('  current torrent count: %d' %
                                    self._current_meta_count)
                show_content.append('  total torrent count: %d' %
                                    len(self._meta_list))
                show_content.append('\n')
    
                # 存储运行状态到文件
                try:
                    with open(self._stat_file, 'wb') as f:
                        f.write('\n'.join(show_content))
                    with open(self._result_file, 'wb') as f:
                        json.dump(self._meta_list, f)
                except Exception as err:
                    pass
    
                # 测试是否到达退出时间
                if interval >= self._exit_time:
                    # stop
                    break
    
                # 每天结束备份结果文件
                self._backup_result()
    
            # 销毁p2p客户端
            for session in self._sessions:
                torrents = session.get_torrents()
                for torrent in torrents:
                    session.remove_torrent(torrent)
    


    运行效率

    在我的一台512M内存,单cpu机器上。爬虫刚开始运行稍慢,运行几分钟后收集速度稳定在 180个每分钟,1小时采集10000左右。

    运行状态

    run times: 12
    
    torrents:
      pid: 11480
      time: 2014-08-18 22:45:01
      run time: day: 0, hour: 0, minute: 12, second: 25
      start port: 32900
      collect session num: 20
      info hash nums from get peers: 2222
      torrent collection rate: 179.098480 /minute
      current torrent count: 2224
      total torrent count: 58037
    


    爬虫完整代码

    完整的代码参见:https://github.com/blueskyz/DHTCrawler

    还包括一个基于twisted的监控进程,用来查看爬虫状态,在爬虫进程退出后重新启动。