Skip to content

Message lifecycle

Warning

Expiration, retrying and 'dead letters' queueing supported in RabbitMQTransport only (on by default).

django-cqrs since version 1.11 provides mechanism for reliable message delivery.

Message lifecycle

Expiration

Name Default Description
CQRS_MESSAGE_TTL 86400 Limits message lifetime in seconds, then it will be moved to 'dead letters' queue.
# settings.py

CQRS = {
    ...
    'master': {
        'CQRS_MESSAGE_TTL': 86400, # 1 day
    },
}

Fail

Message assumed as failed when a consumer raises an exception or returns negative boolean value (False, None, etc).

# models.py

class Example(ReplicaMixin, models.Model):
    CQRS_ID = 'example'
    ...

    @classmethod
    def cqrs_create(cls, sync, mapped_data, previous_data=None, meta=None):
        raise Exception("Some issue during create") # exception could be caught at should_retry_cqrs() method

    def cqrs_update(self, sync, mapped_data, previous_data=None, meta=None):
        return None # returning negative boolean for retrying

Retrying

Name Default Description
CQRS_MAX_RETRIES 30 Maximum number of retry attempts. Infinite if None, 0 to disable retries.
CQRS_RETRY_DELAY 2 Constant delay in seconds between message failure and requeueing.
CQRS_DELAY_QUEUE_MAX_SIZE 1000 Maximum number of delayed messages per worker. Infinite if None.
# settings.py

CQRS = {
    ...
    'replica': {
        'CQRS_MAX_RETRIES': 30, # attempts
        'CQRS_RETRY_DELAY': 2,  # seconds
        'CQRS_DELAY_QUEUE_MAX_SIZE': 1000,
    },
}

Customization

The dj_cqrs.mixins.ReplicaMixin allows to take full control on retrying.

# models.py

class Example(ReplicaMixin, models.Model):
    CQRS_ID = 'example'
    ...

    @classmethod
    def get_cqrs_retry_delay(cls, current_retry=0):
        # Linear delay growth
        return (current_retry + 1) * 60

    @classmethod
    def should_retry_cqrs(cls, current_retry, exception=None):
        # Retry 10 times or until we have troubles with database
        return (
            current_retry < 10
            or isinstance(exception, django.db.OperationalError)
        )

Dead letters

Expired or failed messages which should not be retried are moved to 'dead letters' queue.

Name Default Description
dead_letter_queue \'dead_letter_\' + queue Queue name for dead letters.
dead_message_ttl 864000 Expiration seconds. Infinite if None.
# settings.py

CQRS = {
    ...
    'queue': 'example',
    'replica': {
        ...
        'dead_letter_queue': 'dead_letter_example', # generated from CQRS.queue
        'dead_message_ttl': 864000, # 10 days
    },
}

Commands

Dump

Dumps all dead letters to stdout.

$ python manage.py cqrs_dead_letters dump
{"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":30,"expires":"2021-05-01T11:50:00+00:00"}

Retry

Retry all dead letters. Message body retries and expires fields are downgraded.

$ python manage.py cqrs_dead_letters retry
Total dead letters: 1
Retrying: 1/1
{"signal_type":"SAVE","cqrs_id":"example","instance_data":{"id":1,"cqrs_revision":0,"cqrs_updated":"2021-04-30 11:50:05.164341+00:00"},"previous_data":null,"instance_pk":135,"correlation_id":null,"retries":0,"expires":"2021-05-02T12:30:00+00:00"}

Purge

Removes all dead letters.

$ python manage.py cqrs_dead_letters purge
Total dead letters: 1
Purged