介绍下python的异步抓取, 为了榨干cpu,大佬们也是想尽办法,作为一个码农向大佬们致敬。说白了就是在访问其他服务器(IO阻塞)的时候自动挂起,接着进行其他任务,等到结果返回接受结果,继续往下进行。主要用到asyncio,aiohttp

asyncio

用于使用协同程序编写单线程并发代码,通过套接字和其他资源多路复用I / O访问,运行网络客户端和服务器以及其他相关原语的库。

aiohttp

用于asyncio和Python的异步HTTP客户端/服务器。

安装aiohttp

1
$ pip install aiohttp

async with

异步上下文管理器指的是在enter和exit方法处能够暂停执行的上下文管理器。需要额外增加aenteraexit俩个方法来实现. 下面为aiohttp的方法

1
2
3
4
5
6
7
8
async def __aenter__(self) -> 'ClientSession':
    return self

async def __aexit__(self,
                    exc_type: Optional[Type[BaseException]],
                    exc_val: Optional[BaseException],
                    exc_tb: Optional[TracebackType]) -> None:
    await self.close()

先看例子,抓取新浪爱财足彩比赛数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
import aiohttp
import asyncio
from loguru import logger
from typing import Any, Text, List, Dict, Tuple
from datetime import datetime, timedelta
import json


class Match:

    def __init__(self):
        self.today: datetime = datetime.now()

    @staticmethod
    def strip_js_var(text: str, include=False) -> Text:
        if not text:
            return ''
        if include:
            return text[text.find('(') + 1: text.rfind(')')]
        else:
            return text[text.find('('): text.rfind(')') + 1]

    @staticmethod
    async def request(method: str, url: str, **kwargs: Any) -> Text:
        """发起http请求"""
        logger.debug('request {}', url)
        async with aiohttp.ClientSession() as session:
            kwargs['headers'] = {
                'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) '
                              'Chrome/73.0.3683.103 Safari/537.36'
            }
            async with session.request(method, url, **kwargs) as resp:
                if resp.status == 200:
                    return await resp.text(errors='ignore')
        logger.error("url {} 出错了", url)
        return ''

    async def beijing_change_issue(self) -> List[Tuple[Dict]]:
        """新浪爱财--北京单场比赛"""
        
        # 当前时间小于11点钟抓取昨天诗句
        timer = self.today
        if timer.hour < 11:
            timer = self.today - timedelta(days=1)
        
        # 内容URL
        change_url: str = "https://live.aicai.com/liveScore/changeIssueJson.htm?ttyqGameIndex=405&issue={}"
        json_url: str = "https://live.aicai.com/static/no_cache/jc/ttyq/405_{}_8.json"
        
        # 挂起程序,等待获取数据
        resp: Text = await self.request('GET', change_url.format(timer.strftime('%Y-%m-%d')))
        resp2: Text = await self.request('GET', json_url.format(timer.strftime('%Y-%m-%d')))
        if not resp or not resp2:
            return []
            
        # 对数据的一些处理判断
        try:
            match: dict = json.loads(resp)
            match_change: list = json.loads(resp2)
            if isinstance(match.get('matchList'), str):
                match['matchList'] = json.loads(match['matchList'])
            if match.get('status') != 'success' and not match.get('hasMatch'):
                raise Exception("get status error")
            data: List[Tuple[Dict]] = list(zip(match['matchList'], match_change))
            return data
        except Exception as e:
            logger.error("json loads error {}", e)
            return []


    async def run(self):
        """async 定义一个协程方法"""
        # await 挂起等待方法结束
        res = await self.beijing_change_issue()
        print(res)


if __name__ == '__main__':
    match = Match() # 获取实例
    loop = asyncio.get_event_loop() # 获取一个事件循环
    loop.run_until_complete(match.run()) # run_until_complete 表示阻塞运行到协程match.run()方法结束