Tornado

Tornado是一个Python Web框架和异步网络库,最初是在FriendFeed上开发的。通过使用非阻塞网络I / O,Tornado可以扩展到数万个开放连接,使其成为长轮询,WebSockets和其他需要与每个用户建立长期连接的应用程序的理想选择。

优势
  • 轻量级web框架
  • 异步非阻塞IO处理方式
  • 出色的抗负载能力
  • 优异的处理性能,不依赖多进程/多线程,一定程度上解决C10K问题
  • WSGI全栈替代产品,推荐同时使用其web框架和HTTP服务器
需要用到工具与各种第三方库
  • Tornado(主角)
  • Pycharm(编辑器)
  • aioredis(异步redis)
  • aiomysql(异步mysql)
  • peewee-async(异步mysql orm)

下面一步一步开始搭建起一个完整的框架吧

创建项目
  1. 打开Pycharm,点击Create New Project
  2. 输入我们的项目名称这里就叫tornado-demoPycharm会自动创建一个虚拟环境,点击Create
  3. 此时目录结构 shell ➜ tornado-demo ls venv
  4. 我们使用PycharmTerminal输入,下面命令。为啥用PycharmTerminal输入?因为它自己使用了虚拟环境,在终端上多了一个(venv)就代表使用了虚拟环境。安装的第三方库都在venv目录下。这样可以解决多个项目中第三方库版本不一致出现的问题。 shell pip install tornado

OK,安装完成,我们先看下官方列子,在根目录创建 main.py

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
import tornado.ioloop
import tornado.web

class MainHandler(tornado.web.RequestHandler):
    # get方法,代表请求方式、还支持常用的post、put、delete
    def get(self):
        self.write("Hello, world")

def make_app():
    # 返回一个配置`/`的路由的实例
    return tornado.web.Application([
        (r"/", MainHandler),
    ])

if __name__ == "__main__":
    app = make_app()
    # 监听端口号为8888
    app.listen(8888)
    # 获取当前io事件循环并启动
    tornado.ioloop.IOLoop.current().start()

启动服务

1
(venv) ➜  tornado-demo python3 main.py 

访问 http://localhost:8888/ 可以看到浏览器已经把Hello, world 返回过来啦。easy不。 可能有些人感觉这样配置路由是不是不太美观啊? 我也感觉确实很丑。我们修改下。

我们先创建俩个package为appsrouteapps主要是我们应用的处理逻辑。 route不用说肯定是放我们路由的呀。

目录结构如下

1
2
3
4
5
6
7
(venv) ➜  tornado-demo tree -I "venv"
.
├── apps
│   └── __init__.py
├── main.py
└── route
    └── __init__.py

我们拿用户模块做一个讲解的例子。

创建Handler基类

我们先创建一个基类,在基类中可以写一些公共方法让我们写代码的时候很方便。创建 base.py 在/apps目录下. 内容为空,当我们需要的时候我们来补充。

1
2
3
4
5
6
7
from typing import Optional, Awaitable

import tornado.web

class BaseHandler(tornado.web.RequestHandler):
    def data_received(self, chunk: bytes) -> Optional[Awaitable[None]]:
        pass

我们在/apps 下创建user包,在user包下创建 auth.py文件. 当前目录结构为

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
(venv) ➜  tornado-demo tree -I "venv"
.
├── apps
│   ├── __init__.py
│   ├── base.py
│   └── user
│       ├── __init__.py
│       └── auth.py
├── main.py
├── route
│   └── __init__.py

我们现在编写auth文件中的内容

1
2
3
4
5
6
7
from apps.base import BaseHandler


class LoginHandler(BaseHandler):
    async def post(self):
        """用户登陆"""
        self.write("Hello, world")

我们的handler写好了,现在与路由结合起来就可以运行了,想想就激动。 打开route包创建一个api.py文件

1
2
3
4
5
from apps.user.auth import LoginHandler

urls = [
    (r"/api/v1/user/auth/login", LoginHandler),
]

我们在修改下main.py内容就可以启动了,修改为 下面使用loguru包作为日志。需要安装pip install loguru

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import asyncio
import tornado.web
import tornado.ioloop
from route.api import urls
from loguru import logger
from tornado.options import parse_command_line


def make_app():
    logger.add("logs/api.log")
    apps = tornado.web.Application(urls)
    return apps


if __name__ == "__main__":
    try:
        logger.info("server start")
        parse_command_line()
        loop = asyncio.get_event_loop()
        app = make_app()
        app.listen(8888)
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info("server stop")

测试下刚才的API是否可以运行,使用POST访问地址 http://localhost:8888/api/v1/user/auth/login

1
2
(venv) ➜  tornado-demo curl -X POST http://localhost:8888/api/v1/user/auth/login
Hello, world%             

成功返回。一个基础版的web框架运行起来啦。

json数据接受

用户提供手机号与密码来登陆。字段 tel pwd 使用json提交 我们发现Tornado json提交过来,不能象self.get_argument('tel')这样方便接受。这时候我们只需要修改下BaseHandler就可以啦。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
def get_json_args(self, key=None, default=None):
    """获取json数据
    :return dict or val
    """
    try:
        if self._json_args is None:
            self._json_args = json.loads(self.request.body)
        if key is None:
            return self._json_args
        if default:
            return self._json_args.get(key, default)
        return self._json_args[key]
    except json.JSONDecodeError:
        raise tornado.web.HTTPError(400)

我们在对应handler调用get_json_args()方法就可以获取全部body数据了。get_json_args('tel') 获取指定数据内容。

返回格式

API 一般都有一个固定的格式,常见的格式为{"code": 200, "msg": "", data: {}}BaseHandler添加一个方法response用与数据返回

1
2
3
4
def response(self, data=None, code=200, msg=""):
    if not data:
        data = {}
    self.write(json.dumps(dict(code=code, data=data, msg=msg)))
peewee_async

安装

1
2
(venv) ➜  tornado-demo pip install peewee-async
(venv) ➜  tornado-demo pip install aiomysql

我们创建一个databases库,创建mysql.py存放单例mysql pool

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from loguru import logger
from peewee_async import Manager
from peewee_async import PooledMySQLDatabase


class MysqlPool:
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            logger.debug("init mysql pool")
            cls.conn = PooledMySQLDatabase('blog', host='localhost', password='', port=3306, user='root',
                                           max_connections=10, charset='utf8mb4')
            cls.manager = Manager(cls.conn)
            cls._instance = super(MysqlPool, cls).__new__(cls, *args, **kwargs)
        return cls._instance

    @property
    def get_conn(self):
        return self.conn

    @property
    def get_manager(self):
        return self.manager
aioredis

安装

1
(venv) ➜  tornado-demo pip install aioredis

在databases库下,创建redis.py文件

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import aioredis
from loguru import logger


async def _init_with_loop(loop):
    """
    redis 连接池
    :param loop: 事件循环
    :return: redis pool
    """
    __pool = await aioredis.create_redis_pool(
        'redis://localhost', minsize=5, maxsize=10, encoding='utf8', loop=loop)
    return __pool


class RedisPool:
    def __new__(cls, *args, **kwargs):
        if not hasattr(cls, "_instance"):
            logger.debug("init redis pool")
            _loop = kwargs.get("loop", None)
            assert _loop, "use get_event_loop()"
            cls._redis = _loop.run_until_complete(_init_with_loop(_loop))
            cls._instance = super(RedisPool, cls).__new__(cls)
        return cls._instance

    def get_conn(self) -> aioredis.Redis:
        return self._redis

因为Redis需要事件循环,我是在服务启动的时候传递了当前的事件循环给RedisPool 所以main.py需要稍微修改下

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
import asyncio
import tornado.web
import tornado.ioloop
from route.api import urls
from loguru import logger
from databases.redis import RedisPool
from tornado.options import parse_command_line


def make_app(loop):
    logger.add("logs/api.log")
    apps = tornado.web.Application(urls)
    # 把loop传递过RedisPool获取一个链接
    apps.redis = RedisPool(loop=loop).get_conn()
    return apps


if __name__ == "__main__":
    try:
        logger.info("server start")
        parse_command_line()
        loop = asyncio.get_event_loop()
        app = make_app(loop)
        app.listen(8888)
        loop.run_forever()
    except KeyboardInterrupt:
        logger.info("server stop")

当重启服务时可以看到下面log信息, 说明启动成功了。

1
2
2019-04-30 15:36:04.541 | INFO     | __main__:<module>:20 - server start
2019-04-30 15:36:04.542 | DEBUG    | databases.redis:__new__:19 - init redis pool

我们在handler赶紧测试下是否可以。打开apps/user/auth.py 修改post方法

1
2
3
4
5
6
7
8
9
from apps.base import BaseHandler


class LoginHandler(BaseHandler):
    async def post(self):
        """用户登陆"""
        res = await self.application.redis.set('xxx', 123)
        print(res)
        self.response()

我们发起一个请求到当前handler,测试下是否将xxx设置成功?请求过后,打开redis 命令行查询下

1
2
127.0.0.1:6379> get xxx
"123"

ok, 成功了。可能有同学发现没有任何提示,这不能忍受呀。 我们在BaseHandler类下面简单封装下

1
2
3
4
5
from aioredis import Redis

@property
    def redis(self) -> Redis:
        return self.application.redis

再对刚才handler测试下,修改post方法为

1
2
3
4
5
async def post(self):
    """用户登陆"""
    res = await self.redis.set('xxx', 666)
    print(res)
    self.response()

在过程中提示是不是方法提示都出来了呀。又可以愉快的搬砖了。

peewee orm

接下来我们创建一个models库,在models 创建一个base.py文件保存一些公共的字段,比如,id,create_at,update_at 主键、创建时间,更新时间等。下面字段意思我就不详细讲了,建议先看orm文档理解下,再看下面代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from databases.mysql import MysqlPool
from peewee import Model, PrimaryKeyField


class BaseModel(Model):
    id = PrimaryKeyField()

    class Meta:
        # table_name = 'users'
        database = MysqlPool().get_conn
        legacy_table_names = False

创建一个users.py文件保存我们的用户模型

1
2
3
4
5
6
7
8
from .base import BaseModel
from peewee import CharField


class Users(BaseModel):
    name = CharField(50)
    tel = CharField(50)
    pwd = CharField(50)

mysql对应的表结构,直接执行下面sql,

1
2
3
4
5
6
7
CREATE TABLE `users` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(50) NOT NULL,
  `tel` varchar(50) NOT NULL DEFAULT '',
  `pwd` varchar(50) DEFAULT '',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4;
Services

为了代码简洁,我们创建个services来处理业务,handler做逻辑处理。mysql执行都在services中。搞起来,首先创建一个services包,在包力创建一个base.py文件作为Services基类

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
from peewee import Model
from loguru import logger
from peewee_async import execute
from databases import MysqlPool, RedisPool

class BaseService:
    model: Model = None # ORM类
    _service = dict()   # 类实例集合

    
    @classmethod
    def instance(cls):
        """单例,对实例保存_service中避免多次调用重新创建"""
        """Method  instance
        :return: cls
        """
        instance = cls._service.get(cls.__name__, None)
        if not instance:
            instance = cls.__new__(cls)
            cls._service.setdefault(cls.__name__, instance)
        return instance

    @staticmethod
    async def execute(sql):
        """有些复杂的sql,orm用户起来不太方便,直接运行sql"""
        try:
            return await execute(sql)
        except Exception as e:
            logger.exception(str(e))
            return None

    async def insert(self, **kwargs):
        """peewee create方法保存数据"""
        try:
            return await self.db.create(self.model, **kwargs)
        except Exception as e:
            logger.exception(str(e))
            return None

    async def update(self, data):
        """更新一条数据
        :param Model
        """
        try:
            return await self.db.update(data)
        except self.model.DoesNotExist:
            return None
        except Exception as e:
            logger.exception(str(e))
            return None

    async def find_one(self, *args, **kwargs):
        """查找一条数据, 当返回多条数据会报错"""
        try:
            if not args:
                return await self.db.get(self.model, *args, **kwargs)
            res = await self.find(*args, **kwargs)
            if len(res) > 1:
                raise Exception("find multiple data")
            elif len(res) == 1:
                return res[0]
            return None
        except self.model.DoesNotExist:
            return None
        except Exception as e:
            logger.exception(str(e))
            return None

    async def find(self, *args, **kwargs):
        """查询指定字段,查找多条数据"""
        if not args:
            raise Exception("fields is empty")

        sql = self.model.select(*[getattr(self.model, k) for k in args])
        for key, val in kwargs.items():
            sql = sql.where(getattr(self.model, key) == val)
        try:
            return await self.db.execute(sql)
        except Exception as e:
            logger.exception(str(e))
            return []

    @property
    def db(self):
        """获取mysql链接"""
        return MysqlPool().get_manager

    @property
    def redis(self):
        return RedisPool().get_conn()

创建好base.py后, 创建users包,在包内创建user.py来处理业务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
from services.base import BaseService
from models.users import Users


class UserService(BaseService):
    model = Users

    async def get_user_by_tel(self, tel: str) -> Users:
        """通过tel查询用户是否存在"""
        """为了节省时间,把几种常用的方法都在一个方法中一下"""
        # 查找数据
        # user: Users = await self.find_one('pwd', tel=tel) # 查询指定字段pwd
        user: Users = await self.find_one(tel=tel)  # 查询全部字段

        # 更新一条数据
        # user.tel = '123'
        # await self.update(user)

        # 创建用户
        # user: Users = await self.insert(tel='xxxx', pwd='888', name='eee')
        # print(user.id)
        return user

最终目录结构

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
(venv) ➜  tornado-demo tree -I "venv"       
ls
.
├── apps
│   ├── __init__.py
│   ├── base.py
│   └── user
│       ├── __init__.py
│       └── auth.py
├── databases
│   ├── __init__.py
│   ├── mysql.py
│   └── redis.py
├── logs
│   └── api.log
├── main.py
├── models
│   ├── __init__.py
│   ├── base.py
│   └── users.py
├── route
│   ├── __init__.py
│   └── api.py
├── services
│   ├── __init__.py
│   ├── base.py
│   └── user.py

我们修改下apps/user/auth.py 返回用户信息

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
from apps.base import BaseHandler
from services.user import UserService


class LoginHandler(BaseHandler):
    async def post(self):
        """用户登陆"""
        user = await UserService.instance().get_user_by_tel(self.get_json_args('tel'))
        if user.pwd != self.get_json_args('pwd'):
            return self.response(code=401, msg="pwd error")
        self.response(user)

请求结果返回的时候出现了错误

1
2
    raise TypeError(f'Object of type {o.__class__.__name__} '
TypeError: Object of type Users is not JSON serializable

我们需要对response进行修改,在此之前我们创建一个CJsonEncoder

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
import json
from peewee import Model
from datetime import datetime, date
from playhouse.shortcuts import model_to_dict
from decimal import Decimal


class CJsonEncoder(json.JSONEncoder):
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.strftime('%Y-%m-%d %H:%M:%S')
        elif isinstance(obj, date):
            return obj.strftime("%Y-%m-%d")
        elif isinstance(obj, Model):
            return model_to_dict(obj)
        elif isinstance(obj, Decimal):
            return str(obj)
        else:
            return json.JSONEncoder.default(self, obj)

response简单的修改就可以了

1
2
3
4
def response(self, data=None, code=200, msg=""):
    if not data:
        data = {}
    self.write(json.dumps(dict(code=code, data=data, msg=msg), cls=CJsonEncoder))

重启请求, OK返回结果正常。

在此一个完整框架基本就算搭建完成了。 最近刚开始写博客献丑了,有什么不对地方希望大佬们多多指正。代码地址https://github.com/zhangzhch/tornado-async