Fork me on GitHub

celery及邮件发送

celery

作用:异步调用, 定时任务

使用方法:

新建tasks文件, 实例化celery

方式1:celery = Celery('名字', '配置')

方式2: 新建一个celery_conf.py文件

在celery_conf.py中写入配置,在tasks中导入celery_conf.py

1
2
3
celery = Celery('名字)
# 从配置文件拿到相关配置
celery。config_from_object(celery_conf)

配置:

1
2
3
4
5
6
7
BROKER_URL = 'redis://127.0.0.1:6379/10'     # 消息队列,我这里存放于redis的10库

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/11'  # 结果存放位置,我这里存放于redis的11库

CELERY_CONCURRENCY = 3  # 设置worker数量

TIME_ZONE = 'Asia/Shanghai'    # 设置时区

设置定时任务

1
2
3
4
5
6
7
CELERYBEAT_SCHEDULE = {
    '任务名': {
        'task': '定时执行的任务',
        'args': (参数,元组形式),
        'schedule': 时间
    }
}

异步调用

  • 在tasks.py中定义需要处理的函数
1
2
3
4
@app.task  还可以使@app.shared_task
	def 函数名1(参数列表):
		函数体
		return 返回值
  • 在其他函数中调用
1
2
3
4
def 函数名2(参数列表):
	# 调用tasks中的函数
	函数名1.delay(需要的参数)
	return 返回值

邮件发送

安装flask-mail 插件

配置

1
2
3
4
5
6
app.config["MAIL_SERVER"] = "smtp.qq.com"
app.config["MAIL_PORT"] = 465
app.config["MAIL_USE_SSL"] = True
app.config["MAIL_USERNAME"] = "账号名@qq.com"
app.config["MAIL_PASSWORD"] = "授权码"
MAIL_DEFAULT_SENDER = 'xxx'

实例化

1
2
mail = Mail()
mail.init_app(app)

调用

1
2
3
msg = Message(title, sender="MAIL_USERNAME相同值", recipients=[收件人邮箱])
msg.html=渲染出来的HTML页面(render_template)
mail.send(msg)

可参照文档:Flask-Mail的使用官方文档