通过 Celery 进行异步查询
Celery
在大型分析数据库中,运行执行时间为分钟或小时的查询很常见。为了支持执行时间超过典型 Web 请求超时时间(30-60 秒)的长运行查询,有必要为 Superset 配置一个异步后端,该后端由以下部分组成:
- 一个或多个 Superset 工作器(实现为 Celery 工作器),可以使用
celery worker
命令启动,运行celery worker --help
查看相关选项。 - 一个 Celery 代理(消息队列),我们建议使用 Redis 或 RabbitMQ
- 一个结果后端,用于定义工作器将查询结果持久化的位置
配置 Celery 需要在 superset_config.py
中定义一个 CELERY_CONFIG
。工作器和 Web 服务器进程都应该具有相同的配置。
class CeleryConfig(object):
broker_url = "redis://127.0.0.1:6379/0"
imports = (
"superset.sql_lab",
"superset.tasks.scheduler",
)
result_backend = "redis://127.0.0.1:6379/0"
worker_prefetch_multiplier = 10
task_acks_late = True
task_annotations = {
"sql_lab.get_sql_results": {
"rate_limit": "100/s",
},
}
CELERY_CONFIG = CeleryConfig
要启动一个 Celery 工作器以利用配置,请运行以下命令
celery --app=superset.tasks.celery_app:app worker --pool=prefork -O fair -c 4
要启动一个调度定期后台作业的作业,请运行以下命令
celery --app=superset.tasks.celery_app:app beat
要设置结果后端,您需要将 from flask_caching.backends.base import BaseCache 的派生类的实例传递给 superset_config.py
中的 RESULTS_BACKEND 配置键。您可以使用 Memcached、Redis、S3 (https://pypi.python.org/pypi/s3werkzeugcache)、内存或文件系统(在单服务器类型设置中或用于测试),或编写自己的缓存接口。您的 superset_config.py
可能看起来像这样
# On S3
from s3cache.s3cache import S3Cache
S3_CACHE_BUCKET = 'foobar-superset'
S3_CACHE_KEY_PREFIX = 'sql_lab_result'
RESULTS_BACKEND = S3Cache(S3_CACHE_BUCKET, S3_CACHE_KEY_PREFIX)
# On Redis
from flask_caching.backends.rediscache import RedisCache
RESULTS_BACKEND = RedisCache(
host='localhost', port=6379, key_prefix='superset_results')
为了提高性能,现在使用 MessagePack 和 PyArrow 进行结果序列化。如果出现任何问题,可以通过在 superset_config.py
中设置 RESULTS_BACKEND_USE_MSGPACK = False
来禁用它。升级现有环境时,请清除您现有的结果缓存存储。
重要说明
-
Superset 集群中的所有工作器节点和 Web 服务器必须共享一个公共元数据数据库。这意味着 SQLite 在这种情况下不起作用,因为它对并发性的支持有限,并且通常位于本地文件系统上。
-
在您的整个设置中只能有一个 celery beat 实例在运行。如果不是,后台作业可能会被多次调度,从而导致奇怪的行为,例如报告的重复交付、比预期更高的负载/流量等。
-
如果您在数据库设置中启用了异步查询执行(来源 > 数据库 > 编辑记录),SQL Lab 只会异步运行您的查询。
Celery Flower
Flower 是一个用于监视 Celery 集群的基于 Web 的工具,您可以从 pip 中安装它
pip install flower
您可以使用以下命令运行 flower
celery --app=superset.tasks.celery_app:app flower