Celery
Monitor Celery Beat and scheduled Celery tasks with Cronping.
Wrap scheduled Celery tasks with a decorator that sends /start, success, and /fail pings.
Decorator
import os
import requests
from functools import wraps
def cronping(func):
@wraps(func)
def wrapper(*args, **kwargs):
ping_url = os.environ.get("CRONPING_URL")
if not ping_url:
return func(*args, **kwargs)
try:
requests.get(f"{ping_url}/start", timeout=5)
except requests.RequestException:
pass
try:
result = func(*args, **kwargs)
requests.get(ping_url, timeout=5)
return result
except Exception as exc:
try:
requests.post(
f"{ping_url}/fail",
data=f"{type(exc).__name__}: {exc}",
timeout=5,
)
except requests.RequestException:
pass
raise
return wrapperScheduled task
from celery import shared_task
from .monitoring import cronping
@shared_task
@cronping
def run_daily_backup():
backup_database()
upload_to_s3()Multiple scheduled tasks
Use one heartbeat URL per critical scheduled task.
@shared_task
def sync_stripe():
ping_url = os.environ["STRIPE_SYNC_CRONPING_URL"]
requests.get(f"{ping_url}/start", timeout=5)
try:
run_sync()
except Exception as exc:
requests.post(f"{ping_url}/fail", data=str(exc), timeout=5)
raise
else:
requests.get(ping_url, timeout=5)Match the Cronping heartbeat schedule to your Celery Beat schedule. If Beat runs in local time, configure the same timezone in Cronping.