cronping

Celery

Monitor Celery Beat and scheduled Celery tasks with Cronping.

Wrap scheduled Celery tasks with a decorator that sends /start, success, and /fail pings.

Decorator

import os
import requests
from functools import wraps


def cronping(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        ping_url = os.environ.get("CRONPING_URL")
        if not ping_url:
            return func(*args, **kwargs)

        try:
            requests.get(f"{ping_url}/start", timeout=5)
        except requests.RequestException:
            pass

        try:
            result = func(*args, **kwargs)
            requests.get(ping_url, timeout=5)
            return result
        except Exception as exc:
            try:
                requests.post(
                    f"{ping_url}/fail",
                    data=f"{type(exc).__name__}: {exc}",
                    timeout=5,
                )
            except requests.RequestException:
                pass
            raise

    return wrapper

Scheduled task

from celery import shared_task
from .monitoring import cronping


@shared_task
@cronping
def run_daily_backup():
    backup_database()
    upload_to_s3()

Multiple scheduled tasks

Use one heartbeat URL per critical scheduled task.

@shared_task
def sync_stripe():
    ping_url = os.environ["STRIPE_SYNC_CRONPING_URL"]
    requests.get(f"{ping_url}/start", timeout=5)
    try:
        run_sync()
    except Exception as exc:
        requests.post(f"{ping_url}/fail", data=str(exc), timeout=5)
        raise
    else:
        requests.get(ping_url, timeout=5)

Match the Cronping heartbeat schedule to your Celery Beat schedule. If Beat runs in local time, configure the same timezone in Cronping.

On this page