作者都是各自领域经过审查的专家,并撰写他们有经验的主题. 我们所有的内容都经过同行评审,并由同一领域的Toptal专家验证.
现代web应用程序及其底层系统比以往任何时候都更快、响应更快. However, 仍然有许多情况下,您希望将繁重任务的执行卸载到整个系统架构的其他部分,而不是在主线程上处理它们. 识别这样的任务就像检查它们是否属于以下类别一样简单:
执行后台任务的直接解决方案是在单独的线程或进程中运行它. Python是一种高级图灵完备编程语言, 不幸的是,它没有提供与Erlang相匹配的内置并发性, Go, Java, Scala, or Akka. 这些都是基于Tony Hoare的通信顺序过程(CSP). 另一方面,Python线程是由全局解释器锁(GIL),这可以防止多个本机线程同时执行Python字节码. 如何摆脱GIL是很多人讨论的话题 Python开发人员,但这不是本文的重点. Python中的并发编程是过时的,尽管欢迎您在 Python多线程教程 由另一位Toptaler Marcus McCurdy提出. So, 一致地设计进程之间的通信是一个容易出错的过程,并导致代码耦合和糟糕的系统可维护性, 更不用说它会对可伸缩性产生负面影响. 另外, Python进程是操作系统(OS)下的正常进程, 使用整个Python标准库, 它变成了重量级的. 随着应用程序中进程数量的增加, 从一个这样的过程切换到另一个这样的过程是一个耗时的操作.
要更好地理解Python的并发性,请观看David Beazley在 PyCon’15.
一个更好的解决方案是服务于 分布式队列 或者它著名的兄弟范式叫做 发布-订阅. 如图1所示,有两种类型的应用程序,其中一种称为 publisher,发送消息,另一个称为 subscriber,接收信息. 这两个代理不直接相互作用,甚至不知道彼此. 发布者将消息发送到中央队列,或者 broker,订阅者从该代理接收感兴趣的消息. 这种方法有两个主要优点:
有许多消息传递系统支持这样的范例并提供简洁的API, 由TCP或HTTP协议驱动, e.g.、JMS、RabbitMQ、Redis Pub/Sub、Apache ActiveMQ等.
Celery 是最流行的Python后台任务管理器之一. 芹菜与RabbitMQ或Redis等多个消息代理兼容,可以同时充当生产者和消费者.
Celery是一个基于分布式消息传递的异步任务队列/作业队列. 它专注于实时操作,但也支持调度. 执行单元, 被称为任务, 使用多处理在一个或多个工作服务器上并发执行, Eventlet, or gevent. 任务可以异步执行(在后台)或同步执行(等待直到准备就绪)。. – 芹菜项目
要开始使用芹菜,只需按照在 官方文档.
本文的重点是让您很好地理解哪些用例可以被用于Python任务管理的芹菜所覆盖. 在本文中, 我们不仅将展示有趣的示例,还将尝试学习如何将芹菜应用于实际任务,例如后台邮件, 报告生成, logging, 错误报告. 我将分享我测试任务的方法,除了模拟和, finally, 我将提供一些没有在官方文档中记录的技巧,这些文档花费了我几个小时的研究来发现自己.
如果你以前没有芹菜的经验, 我鼓励您首先按照官方教程进行尝试.
如果这篇文章引起了您的兴趣,并使您想立即深入研究代码,那么请阅读本文 GitHub库 查看本教程中使用的代码. The README
文件将为您提供运行和使用示例应用程序的快捷方法.
对于初学者来说, 我们将通过一系列实际示例向读者展示芹菜如何简单而优雅地作为Python任务管理器并解决看似重要的任务. All examples will be presented within the Django framework; however, 它们中的大多数都可以很容易地移植到其他Python框架(Flask, Pyramid).
项目布局由 Cookiecutter Django; however, 我只保留了一些依赖关系, 在我看来, 促进这些用例的开发和准备. 我还为这篇文章和应用程序删除了不必要的模块,以减少噪音,使代码更容易理解.
- celery_uncovered /
- celery_uncovered / __init__.py
- celery_uncovered / {toyex,技巧,advex}
- celery_uncovered /芹菜.py
- config /设置/{基地,当地,测试}.py
- config / url.py
- manage.py
celery_uncovered / {toyex,技巧,advex}
包含不同的应用程序,我们将在这篇文章中介绍. 每个应用程序都包含一组根据所需的芹菜理解水平组织的示例.celery_uncovered /芹菜.py
定义一个芹菜实例.File: celery_uncovered /芹菜.py
:
从__future__导入absolute_import
import os
从芹菜进口芹菜,信号
#为'芹菜'程序设置默认的Django设置模块.
os.environ.setdefault(“DJANGO_SETTINGS_MODULE”、“配置.settings.local')
app =芹菜(' celery_unused ')
#在这里使用字符串意味着工作线程不需要
# pickle使用Windows时的对象.
app.django config_from_object ('.配置:设置”,名称空间=“芹菜”)
app.autodiscover_tasks ()
然后我们需要确保芹菜会和Django一起启动. 因此,我们导入应用 celery_uncovered / __init__.py
.
File: celery_uncovered / __init__.py
:
从__future__导入absolute_import
#这将确保应用程序总是被导入
# Django启动,shared_task将使用这个应用程序.
from .芹菜导入应用程序celery_app # noqa
__all__ = ['celery_app']
__version__ = '0 ..0.1'
__version_info__ = tuple([int(num) if num . num ..Isdigit () else num用于__version__中的num.替换(' - ','.', 1).split('.')])
配置/设置
是我们的应用程序和芹菜的配置源吗. 根据执行环境的不同,Django会启动相应的设置: local.py
发展或 test.py
for testing. 如果你愿意,你也可以通过创建一个新的python模块来定义你自己的环境.g., prod.py
). 芹菜配置的前缀是 CELERY_
. 在本文中,我将RabbitMQ配置为代理,并将SQLite配置为结果后端.
File: config/local.py
:
CELERY_BROKER_URL = env('CELERY_BROKER_URL', default='amqp://guest:guest@localhost:5672//')
CELERY_RESULT_BACKEND = 'django-db+sqlite:///结果.sqlite'
我们将介绍的第一个案例是报告生成和导出. 在这个例子中, 您将了解如何定义生成CSV报告的任务,并定期使用 celerybeat,一个芹菜调度程序.
用例描述: 从GitHub获取500个最热门的存储库,每个选择的时间段(天), week, month), 按主题分组, 并将结果导出为CSV文件.
如果我们提供一个HTTP服务,该服务将通过单击标记为“生成报告”的按钮来触发此功能,,应用程序将停止并等待任务完成,然后再发送HTTP响应. This is bad. 我们希望我们的web应用程序是快速的,我们不希望我们的用户等待我们的后端计算结果. 而不是等待结果的产生, 我们宁愿通过在芹菜中注册的队列将任务排队给工作进程,并使用 task_id
到前端. 然后前端会使用 task_id
以异步方式查询任务结果.g.(AJAX),并让用户随时了解任务进度. 最后,当流程完成时,结果可以作为文件通过HTTP下载.
首先,让我们将这个过程分解成尽可能小的单元,并创建一个管道:
获取存储库是一个HTTP请求 GitHub搜索API GET /搜索/存储库
. However, GitHub API服务应该处理的限制:API每个请求最多返回100个存储库,而不是500个. 我们可以一次发送五个请求, 但是我们不想让用户等待5个单独的请求,因为HTTP请求是一个I/O绑定操作. 相反,我们可以使用适当的页面参数执行五个并发HTTP请求. 所以页面将在[1]的范围内..5]. 让我们定义一个名为 fetch_hot_repos / 3 -> list
in the toyex/tasks.py
module:
File: celery_uncovered / toyex /当地.py
@shared_task
Def fetch_hot_repos(since, per_page, page):
payload = {
'sort': 'stars', 'order': 'desc', 'q': 'created:>={date}'.格式(日期=以来),
'per_page': per_page, 'page': page,
“access_token”:设置.GITHUB_OAUTH}
headers = {'Accept': 'application/vnd ..github.v3+json'}
Connect_timeout, read_timeout = 5.0, 30.0
R =请求.get(
“http://api.github.com/search/repositories’,
params =有效载荷,
头=头,
超时= (connect_timeout read_timeout))
items = r.json () [u 'items ']
return items
So fetch_hot_repos
创建对GitHub API的请求,并使用存储库列表响应用户. 它接收三个参数来定义我们的请求负载:
since
—按创建日期筛选存储库.per_page
-每个请求返回的结果数量(限制为100).page
——请求的页码(在[1]范围内)..5]).Note: 为了使用GitHub搜索API,您将需要一个OAuth令牌来通过身份验证检查. 在我们的例子中,它保存在 GITHUB_OAUTH
.
Next, 我们需要定义一个主任务,它将负责聚合结果并将它们导出到CSV文件中: produce_hot_repo_report_task/2->filepath:
File: celery_uncovered / toyex /当地.py
@shared_task
def produce_hot_repo_report(period, ref_date=None):
# 1. parse date
Ref_date_str = strf_date(period, ref_date=ref_date)
# 2. 获取和连接
Fetch_jobs = group([
fetch_hot_repos.S (ref_date_str, 100, 1),
fetch_hot_repos.S (ref_date_str, 100, 2),
fetch_hot_repos.S (ref_date_str, 100,3),
fetch_hot_repos.S (ref_date_str, 100, 4),
fetch_hot_repos.S (ref_date_str, 100,5)
])
# 3. 按语言分组
# 4. create csv
和弦(fetch_jobs) (build_report_task返回.年代(ref_date_str)).get()
@shared_task
Def build_report_task(results, ref_date):
All_repos = []
对于结果中的repos:
all_repos += [Repository(repo) for repo in repos]
# 3. 按语言分组
Grouped_repos = {}
对于all_repos中的repo:
if repo.grouped_repos中的语言:
grouped_repos[回购.language].append(repo.name)
else:
grouped_repos[回购.[语言]=[回购].name]
# 4. create csv
lines = []
对于lang在sorted(grouped_repos.keys()):
lines.追加([lang] + grouped_repos[lang])
Filename = '{media}/github-hot-repos-{date}.csv'.媒体格式(=设置.MEDIA_ROOT、日期= ref_date)
返回make_csv(filename, lines)
此任务使用 celery.canvas.group
执行五个并发的调用 fetch_hot_repos / 3
. 等待这些结果,然后将其简化为存储库对象列表. 然后我们的结果集按主题分组,最后导出到生成的CSV文件 MEDIA_ROOT/
directory.
以便定期安排任务, 您可能想要在配置文件中的时间表列表中添加一个条目:
File: config/local.py
from celery.计划导入crontab
Celery_beat_schedule = {
“produce-csv-reports”:{
“任务”:“celery_uncovered.toyex.tasks.produce_hot_repo_report_task’,
'schedule': crontab(分钟=0,小时=0)#午夜,
“参数”(“今天”,):
},
}
为了启动和测试任务是如何工作的,首先我们需要启动芹菜进程:
$ celery -A celery_unused worker - 1 info
接下来,我们需要创建 celery_uncovered /媒体/
directory. 然后,您将能够通过Shell或Celerybeat测试其功能:
Shell:
从datetime导入日期
从celery_uncovered.toyex.Tasks导入produce_hot_repo_report_task
produce_hot_repo_report_task.延迟(今天).(超时= 5)
Celerybeat:
#使用以下命令启动celerybeat
$ celery -A celery_unused beat - 1 info
你可以在 MEDIA_ROOT/
directory.
芹菜最常见的用例之一是发送电子邮件通知. 电子邮件通知是一种离线I/O绑定操作,它利用了本地 SMTP 服务器或第三方SES. 有许多用例涉及到发送电子邮件和, 对大多数人来说, 用户在接收HTTP响应之前不需要等待此过程完成. 这就是为什么在后台执行这些任务并立即响应用户的原因.
用例描述: 通过芹菜向管理员电子邮件报告50X错误.
Python和Django有必要的背景来执行 系统日志记录. 我不会详细介绍Python的日志记录是如何工作的. However, 如果你以前从未尝试过或需要复习, 阅读内置的文档 logging module. 您肯定希望在生产环境中使用它. Django有一个特殊的日志处理程序 AdminEmailHandler 接收到的每条日志消息都会向管理员发送电子邮件.
主要思想是扩展 send_mail
方法 AdminEmailHandler
类,这样它就可以通过芹菜发送邮件. 这可以做到如下图所示:
首先,我们需要设置一个名为 report_error_task
that calls mail_admins
以提供的主题和讯息:
File: celery_uncovered toyex /任务.py
@shared_task
Def report_error_task(主题,消息,*args, **kwargs):
Mail_admins(主题,消息,*args, *kwargs)
Next, 我们实际上扩展了AdminEmailHandler,这样它就会在内部调用已定义的芹菜任务:
File: celery_uncovered / toyex / admin_email.py
from django.utils.导入AdminEmailHandler
从celery_uncovered.handlers.任务导入report_error_task
类CeleryHandler (AdminEmailHandler):
Def send_mail(self, subject, message, *args, **kwargs):
report_error_task.延迟(主题,消息,*args, *kwargs)
最后,我们需要设置日志记录. Django中日志的配置相当简单. 你所需要的是超越 LOGGING
以便日志引擎使用新定义的处理程序开始:
File 配置/设置/当地.py
LOGGING = {
“版本”:1、
“disable_existing_loggers”:假的,
...,
“处理”:{
...
“mail_admins”:{
“级别”:“错误”,
“过滤器”:“require_debug_true”,
“类”:“celery_uncovered.toyex.log_handlers.admin_email.CeleryHandler”
}
},
“伐木”:{
'django': {
'handlers': ['console', 'mail_admins'],
“级别”:“信息”,
},
...
}
}
注意,我有意设置了处理程序过滤器 require_debug_true
以便在应用程序以调试模式运行时测试此功能.
为了测试它,我准备了一个Django视图,它提供了一个“除零”操作 localhost: 8000 /报告错误
. 您还需要启动MailHog Docker容器来测试电子邮件是否确实发送了.
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog
$ CELERY_TASKSK_ALWAYS_EAGER=False python管理.py runserver
$ #用浏览器导航到[http://localhost:8000](http://localhost:8000)
$ #现在检查您的外发邮件通过访问web UI [http://localhost:8025](http://localhost:8025)
作为一个邮件测试工具,我设置了MailHog,并配置Django邮件使用它进行SMTP传递. 有很多方法 部署和运行 MailHog. 我决定使用Docker容器. 您可以在相应的README文件中找到详细信息:
File: 码头工人/ mailhog / README.md
$ docker build . -f docker/mailhog/Dockerfile -t mailhog/mailhog:最新的
$ docker run -d -p 1025:1025 -p 8025:8025 mailhog/mailhog
用浏览器导航到localhost:8025
要配置你的应用程序使用MailHog,你需要在你的配置中添加以下几行:
File: 配置/设置/当地.py
EMAIL_BACKEND = env(' django . info ', default='django . info '.core.mail.backends.smtp.EmailBackend”)
Email_port = 1025
EMAIL_HOST = env('EMAIL_HOST', default='mailhog')
可以用任何可调用函数创建芹菜任务. 默认情况下,任何用户定义的任务都注入 celery.app.task.Task
作为父(抽象)类. 这个类包含异步运行任务(通过网络将其传递给一个Celery worker)或同步运行任务(用于测试目的)的功能。, 创建签名和许多其他实用程序. 在下一个示例中,我们将尝试扩展 Celery.app.task.Task
然后使用它作为基类,以便为我们的任务添加一些有用的行为.
在我的一个项目中, 我正在开发一个应用程序,为最终用户提供Extract, Transform, 类似于Load (ETL)的工具,它能够摄取并过滤大量的分层数据. 后端分为两个模块:
芹菜部署了一个Celerybeat实例和40多个worker. 有超过20个不同的任务组成了管道和编排活动. 每个这样的任务都可能在某个时候失败. 所有这些故障都被转储到每个工人的系统日志中. 在某种程度上,它开始变得不方便调试和维护芹菜层. 最后,我们决定将任务日志隔离到特定于任务的文件中.
用例描述: 扩展芹菜,以便每个任务将其标准输出和错误记录到文件中
芹菜为Python应用程序提供了对其内部工作的良好控制. 它附带一个熟悉的信号框架. 使用芹菜的应用程序可以订阅其中的一些,以增强某些操作的行为. 我们将利用任务级信号来提供对单个任务生命周期的详细跟踪. 芹菜总是带有一个日志记录后端, 我们将从中受益,同时在少数几个地方略加调整,以实现我们的目标.
芹菜已经支持记录每个任务. 要保存到文件中,必须将日志输出分派到适当的位置. 在我们的示例中,任务的正确位置是与任务名称匹配的文件. 在芹菜实例中, 我们将使用动态推断的日志处理程序覆盖内置日志配置. 可以订阅 celeryd_after_setup
信号,然后配置系统日志:
File: celery_uncovered / toyex / celery_conf.py
@signals.celeryd_after_setup.connect
defconfigure_task_logging (instance=None, **kwargs):
任务=实例.app.tasks.keys()
LOGS_DIR =设置.ROOT_DIR.path('logs')
if not os.path.存在(str (LOGS_DIR)):
os.makedirs (str (LOGS_DIR))
打印“dir created”
Default_handler = {
“级别”:“调试”,
“过滤器”:没有,
“类”:“日志记录.文件句柄”,
“文件名”:“
}
Default_logger = {
“处理”:[],
“级别”:“调试”,
“繁殖”:真的
}
Log_config = {
“版本”:1、
# 'incremental': True,
“disable_existing_loggers”:假的,
“处理”:{},
“伐木”:{}
}
对于tasks中的task:
Task = str(Task)
if not task.startswith(“celery_uncovered.'):
continue
Task_handler = copy_dict(default_handler)
task_handler['filename'] = str(LOGS_DIR.路径(task +).log"))
Task_logger = copy_dict(default_logger)
Task_logger ['handlers'] = [task]
LOG_CONFIG['handlers'][task] = task_handler
LOG_CONFIG['loggers'][task] = task_logger
logging.config.dictConfig (LOG_CONFIG)
注意,对于在芹菜应用程序中注册的每个任务, 我们正在构建一个相应的日志记录器及其处理程序. 每个处理程序的类型为 logging.FileHandler
,因此,每个这样的实例都会接收一个文件名作为输入. 要使其运行,只需要将该模块导入到 celery_uncovered /芹菜.py
在文件的末尾:
进口celery_uncovered.tricks.celery_conf
可以通过调用来接收特定的任务日志记录器 get_task_logger (task_name)
. 为了推广每个任务的这种行为,有必要稍微扩展一下 celery.current_app.Task
用几种实用方法:
File: celery_uncovered /技巧/ celery_ext.py
类LoggingTask (current_app.Task):
abstract =真
ignore_result = False
@property
def记录器(自我):
Logger = get_task_logger(self.name)
返回日志记录器
Def log_msg(self, msg, *msg_args):
self.logger.调试(味精、* msg_args)
现在,在打电话给 task.log_msg("你好,我的名字是:%s",任务.request.id)
,日志输出将被路由到任务名称下的相应文件.
为了启动和测试这个任务是如何工作的,首先启动芹菜进程:
$ celery -A celery_unused worker - 1 info
然后你将能够通过Shell测试功能:
从datetime导入日期
从celery_uncovered.tricks.任务导入添加
add.delay(1, 3)
最后,要查看结果,请导航到 celery_uncovered /日志
目录并打开相应的日志文件 celery_uncovered.tricks.tasks.add.log
. 多次运行此任务后,您可能会看到类似于下面的内容:
1 + 2 = 3的结果
1 + 2 = 3的结果
...
让我们想象一个面向国际用户的Python应用程序,它是基于芹菜和Django构建的. 用户可以设置他们使用应用程序的语言(区域设置).
你必须设计一个多语言的、具有地区意识的电子邮件通知系统. 发送电子邮件通知, 您已经注册了一个由特定队列处理的特殊的芹菜任务. 此任务接收一些关键参数作为输入和当前用户区域设置,以便用用户选择的语言发送电子邮件.
现在想象一下,我们有许多这样的任务,但是每个任务都接受一个locale参数. 在这种情况下,在更高的抽象层次上解决它不是更好吗? 在这里,我们看看如何做到这一点.
用例描述: 自动从一个执行上下文中继承范围,并将其作为参数注入当前执行上下文中.
同样,正如我们对任务日志记录所做的那样,我们希望扩展一个基本任务类 celery.current_app.Task
并覆盖一些负责调用任务的方法. 出于演示的目的,我将重写 celery.current_app.任务:apply_async
method. 这个模块还有一些额外的任务可以帮助您生成一个功能齐全的替代品.
File: celery_uncovered /技巧/ celery_ext.py
类ScopeBasedTask (current_app.Task):
abstract =真
ignore_result = False
default_locale_id = default_locale_id
Scope_args = ('locale_id',)
Def __init__(self, *args, **kwargs):
超级(ScopeBasedTask,自我).__init__ (* args, * * kwargs)
self.set_locale (locale = kwargs.get(“locale_id”,
None))
def set_locale(self, scenario_id=None):
self.Locale_id = self.default_locale_id
如果locale_id:
self.Locale_id = Locale_id
else:
self.Locale_id = get_current_locale().id
def apply_async(self, args=None, kwargs=None, **other_kwargs):
self.inject_scope_args (kwargs)
返回超级(ScopeBasedTask,自我).apply_async (args =参数,
kwargs = kwargs, * * other_kwargs)
Def __call__(self, *args, **kwargs):
task_rv = 超级(ScopeBasedTask,自我).__call__ (* args,
**kwargs)
返回task_rv
Def inject_scope_args(self, kwargs):
为了自己的利益.scope_args:
如果不是在夸夸其谈:
Kwargs[参数]= getattr(self,参数)
关键线索是默认情况下将当前区域设置作为键值参数传递给调用任务. 如果以特定区域设置作为参数调用任务,那么它将保持不变.
为了测试这个功能,让我们定义一个虚拟任务,类型为 ScopeBasedTask
. 它通过locale ID定位文件,并以JSON形式读取其内容:
File: celery_uncovered /技巧/任务.py
@shared_task(绑定= True,基地= ScopeBasedTask)
Def read_scenario_file_task(self, **kwargs):
Fixture_parts = ["locale ", "sc_%i.json" %
kwargs [' scenario_id ']]
返回read_fixture (* fixture_parts)
现在您需要做的是重复启动芹菜的步骤, 启动shell, 并在不同的场景中测试该任务的执行情况. 固定装置位于 celery_uncovered /技巧/夹具/地区/
directory.
这篇文章旨在从不同的角度探索芹菜. 我在常规示例中演示了芹菜,例如邮件和报告生成,并向一些有趣的利基业务用例分享了技巧. 芹菜建立在数据驱动的哲学之上,您的团队可以通过将其作为系统堆栈的一部分引入来简化他们的工作. 如果您有基本的Python经验,开发基于celery的服务并不十分复杂, 你应该能很快地学会. 默认配置对于大多数用途来说已经足够好了,但是如果需要,它们可以非常灵活.
我们的团队选择使用芹菜作为后台作业和长时间运行任务的编排后端. 我们将它广泛地用于各种用例,本文中只提到了其中的几个用例. 我们每天摄取和分析千兆字节的数据, 但这只是横向扩展技术的开始.
芹菜是Python世界中最流行的后台作业管理器之一. 芹菜与RabbitMQ或Redis等多个消息代理兼容,可以同时充当生产者和消费者.
发布-订阅(或生产者-消费者)模式是计算机系统中的一种分布式消息传递模式,其中发布者通过消息代理广播消息, 订阅者收听这些消息. 两者都可以作为系统的独立组件, 无意识的:既不意识到对方,也不与对方直接交流的.
system是一个精通Python和Java栈的后端开发人员. 他甚至为车辆数据实现了一个可扩展的M2M云平台.
世界级的文章,每周发一次.
世界级的文章,每周发一次.