API Reference
Django Admin
dj_cqrs.admin.CQRSAdminMasterSyncMixin
Mixin that includes a custom action in AdminModel. This action allows synchronizing master's model items from Django Admin page,
sync_items(request, queryset)
This method synchronizes selected items from the Admin Page. It is registered as a custom action in Django Admin
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request |
Request
|
Original request. |
required |
queryset |
Queryset
|
Original queryset. |
required |
_cqrs_sync_queryset(queryset)
This function is used to adjust the QuerySet before sending the sync signal.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queryset |
Queryset
|
Original queryset. |
required |
Returns:
Type | Description |
---|---|
Queryset
|
Updated queryset. |
Mixins
dj_cqrs.mixins.MasterMixin
Base class for MasterMixin. Users shouldn't use this class directly.
CQRS_FIELDS = ALL_BASIC_FIELDS
class-attribute
instance-attribute
List of fields to include in the CQRS payload. You can also set the fields attribute to the special value 'all' to indicate that all fields in the model should be used.
CQRS_ID = None
class-attribute
instance-attribute
Unique CQRS identifier for all microservices.
CQRS_PRODUCE = True
class-attribute
instance-attribute
If false, no cqrs data is sent through the transport.
CQRS_SERIALIZER = None
class-attribute
instance-attribute
Optional serializer used to create the instance representation.
Must be expressed as a module dotted path string like
mymodule.serializers.MasterModelSerializer
.
CQRS_TRACKED_FIELDS = None
class-attribute
instance-attribute
List of fields of the main model for which you want to track the changes and send the previous values via transport. You can also set the field attribute to the special value "all" to indicate that all fields in the model must be used.
cqrs = MasterManager()
class-attribute
instance-attribute
Manager that adds needed CQRS queryset methods.
cqrs_saves_count
property
Shows how many times this instance has been saved within the transaction.
is_initial_cqrs_save
property
This flag is used to check if instance has already been registered for CQRS update.
call_post_bulk_create(instances: list, using = None)
classmethod
Post bulk create signal caller (django doesn't support it by default).
# Used automatically by cqrs.bulk_create()
instances = model.cqrs.bulk_create(instances)
call_post_update(instances, using = None)
classmethod
Post bulk update signal caller (django doesn't support it by default).
# Used automatically by cqrs.bulk_update()
qs = model.objects.filter(k1=v1)
model.cqrs.bulk_update(qs, k2=v2)
cqrs_sync(using: str = None, queue: str = None) -> bool
Manual instance synchronization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
using |
str
|
The using argument can be used to force the database to use, defaults to None. |
None
|
queue |
str
|
Syncing can be executed just for a single queue, defaults to None (all queues). |
None
|
Returns:
Type | Description |
---|---|
bool
|
True if instance can be synced, False otherwise. |
get_cqrs_meta(**kwargs: dict) -> dict
This method can be overridden to collect model/instance specific metadata.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
kwargs |
dict
|
Signal type, payload data, etc. |
{}
|
Returns:
Type | Description |
---|---|
dict
|
Metadata dictionary if it's provided. |
get_custom_cqrs_delete_data()
This method should be overridden when additional data is needed in DELETE payload.
get_tracked_fields_data() -> dict
CQRS serialization for tracked fields to include in the transport payload.
Returns:
Type | Description |
---|---|
dict
|
Previous values for tracked fields. |
is_sync_instance() -> bool
This method can be overridden to apply syncing only to instances by some rules. For example, only objects with special status or after some creation date, etc.
Returns:
Type | Description |
---|---|
bool
|
True if this instance needs to be synced, False otherwise. |
relate_cqrs_serialization(queryset)
classmethod
This method shoud be overriden to optimize database access
for example using select_related
and prefetch_related
when related models must be included into the master model
representation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queryset |
django.db.models.QuerySet
|
The initial queryset. |
required |
Returns:
Type | Description |
---|---|
django.db.models.QuerySet
|
The optimized queryset. |
reset_cqrs_saves_count()
This method is used to automatically reset instance CQRS counters on transaction commit. But this can also be used to control custom behaviour within transaction or in case of rollback, when several sequential transactions are used to change the same instance.
to_cqrs_dict(using: str = None, sync: bool = False) -> dict
CQRS serialization for transport payload.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
using |
str
|
The using argument can be used to force the database to use, defaults to None. |
None
|
sync |
bool
|
optional |
False
|
Returns:
Type | Description |
---|---|
dict
|
The serialized instance data. |
Mixin for the master CQRS model, that will send data updates to it's replicas.
dj_cqrs.mixins.ReplicaMixin
get_cqrs_retry_delay(current_retry: int) -> int
staticmethod
Returns number of seconds to wait before requeuing the message.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
current_retry |
int
|
Current number of message retries. |
required |
Returns:
Type | Description |
---|---|
int
|
Delay in seconds. |
should_retry_cqrs(current_retry: int, exception = None) -> bool
staticmethod
Checks if we should retry the message after current attempt.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
current_retry |
int
|
Current number of message retries. |
required |
exception |
Exception
|
Exception instance raised during message consume. |
None
|
Returns:
Type | Description |
---|---|
bool
|
True if message should be retried, False otherwise. |
Mixin for the replica CQRS model, that will receive data updates from master. Models, using this mixin should be readonly, but this is not enforced (f.e. for admin).
CQRS_CUSTOM_SERIALIZATION = False
class-attribute
instance-attribute
Set it to True to skip default data check.
CQRS_ID = None
class-attribute
instance-attribute
Unique CQRS identifier for all microservices.
CQRS_MAPPING = None
class-attribute
instance-attribute
Mapping of master data field name to replica model field name.
CQRS_META = False
class-attribute
instance-attribute
Set it to True to receive meta data for this model.
CQRS_NO_DB_OPERATIONS = False
class-attribute
instance-attribute
Set it to True to disable any default DB operations for this model.
CQRS_ONLY_DIRECT_SYNCS = False
class-attribute
instance-attribute
Set it to True to ignore broadcast sync packages and to receive only direct queue syncs.
CQRS_SELECT_FOR_UPDATE = False
class-attribute
instance-attribute
Set it to True to acquire lock on instance creation/update.
cqrs = ReplicaManager()
class-attribute
instance-attribute
Manager that adds needed CQRS queryset methods.
cqrs_create(sync: bool, mapped_data: dict, previous_data: dict = None, meta: dict = None)
classmethod
This method creates model instance from CQRS mapped instance data. It must be overridden by replicas of master models with custom serialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sync |
dict
|
Sync package flag. |
required |
mapped_data |
dict
|
CQRS mapped instance data. |
required |
previous_data |
dict
|
Previous mapped values for tracked fields. |
None
|
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
django.db.models.Model
|
Model instance. |
cqrs_delete(master_data: dict, meta: dict = None) -> bool
classmethod
This method deletes model instance from mapped CQRS master instance data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
master_data |
dict
|
CQRS master instance data. |
required |
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
bool
|
Flag, if delete operation is successful (even if nothing was deleted). |
cqrs_save(master_data: dict, previous_data: dict = None, sync: bool = False, meta: dict = None)
classmethod
This method saves (creates or updates) model instance from CQRS master instance data. This method must not be overridden. Otherwise, sync checks need to be implemented manually.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
master_data |
dict
|
CQRS master instance data. |
required |
previous_data |
dict
|
Previous values for tracked fields. |
None
|
sync |
bool
|
Sync package flag. |
False
|
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
django.db.models.Model
|
Model instance. |
cqrs_update(sync: bool, mapped_data: dict, previous_data: dict = None, meta: dict = None)
This method updates model instance from CQRS mapped instance data. It must be overridden by replicas of master models with custom serialization.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sync |
dict
|
Sync package flag. |
required |
mapped_data |
dict
|
CQRS mapped instance data. |
required |
previous_data |
dict
|
Previous mapped values for tracked fields. |
None
|
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
django.db.models.Model
|
Model instance. |
Managers
dj_cqrs.managers.MasterManager
bulk_create(objs, **kwargs)
Custom bulk create method to support sending of create signals. This can be used only in cases, when IDs are generated on client or DB returns IDs.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
objs |
List[django.db.models.Model]
|
List of objects for creation. |
required |
kwargs |
dict
|
Bulk create kwargs. |
{}
|
bulk_update(queryset, **kwargs)
Custom update method to support sending of update signals.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
queryset |
django.db.models.QuerySet
|
Django Queryset (f.e. filter). |
required |
kwargs |
dict
|
Update kwargs. |
{}
|
dj_cqrs.managers.ReplicaManager
create_instance(mapped_data: dict, previous_data: dict = None, sync: bool = False, meta: dict = None)
This method creates model instance from mapped CQRS master instance data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
mapped_data |
dict
|
Mapped CQRS master instance data. |
required |
previous_data |
dict
|
Previous values for tracked fields. |
None
|
sync |
bool
|
Sync package flag. |
False
|
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
django.db.models.Model
|
ReplicaMixin instance. |
delete_instance(master_data: dict) -> bool
This method deletes model instance from mapped CQRS master instance data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
master_data |
dict
|
CQRS master instance data. |
required |
Returns:
Type | Description |
---|---|
bool
|
Flag, if delete operation is successful (even if nothing was deleted). |
save_instance(master_data: dict, previous_data: dict = None, sync: bool = False, meta: dict = None)
This method saves (creates or updates) model instance from CQRS master instance data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
master_data |
dict
|
CQRS master instance data. |
required |
previous_data |
dict
|
Previous values for tracked fields. |
None
|
sync |
bool
|
Sync package flag. |
False
|
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
django.db.models.Model
|
Model instance. |
update_instance(instance, mapped_data: dict, previous_data: dict = None, sync: bool = False, meta: dict = None)
This method updates model instance from mapped CQRS master instance data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
instance |
django.db.models.Model
|
ReplicaMixin model instance. |
required |
mapped_data |
dict
|
Mapped CQRS master instance data. |
required |
previous_data |
dict
|
Previous values for tracked fields. |
None
|
sync |
bool
|
Sync package flag. |
False
|
meta |
dict
|
Payload metadata, if exists. |
None
|
Returns:
Type | Description |
---|---|
django.db.models.Model
|
ReplicaMixin instance. |
Signals
dj_cqrs.signals
post_bulk_create = Signal()
module-attribute
Signal sent after a bulk create. See dj_cqrs.mixins.RawMasterMixin.call_post_bulk_create.
post_update = Signal()
module-attribute
Signal sent after a bulk update. See dj_cqrs.mixins.RawMasterMixin.call_post_update.
dj_cqrs.signals.MasterSignals
Signals registry and handlers for CQRS master models.
post_bulk_create(sender, **kwargs)
classmethod
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sender |
dj_cqrs.mixins.MasterMixin
|
Class or instance inherited from CQRS MasterMixin. |
required |
post_bulk_update(sender, **kwargs)
classmethod
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sender |
dj_cqrs.mixins.MasterMixin
|
Class or instance inherited from CQRS MasterMixin. |
required |
post_delete(sender, **kwargs)
classmethod
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sender |
dj_cqrs.mixins.MasterMixin
|
Class or instance inherited from CQRS MasterMixin. |
required |
post_save(sender, **kwargs)
classmethod
Parameters:
Name | Type | Description | Default |
---|---|---|---|
sender |
dj_cqrs.mixins.MasterMixin
|
Class or instance inherited from CQRS MasterMixin. |
required |
register_model(model_cls)
classmethod
Registers signals for a model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
model_cls |
dj_cqrs.mixins.MasterMixin
|
Model class inherited from CQRS MasterMixin. |
required |
Transports
dj_cqrs.transport.RabbitMQTransport
Transport class for RabbitMQ.
clean_connection()
classmethod
Clean the RabbitMQ connection.
consume(cqrs_ids = None)
classmethod
Receive data from master model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cqrs_ids |
str
|
cqrs ids. |
None
|
produce(payload)
classmethod
Send data from master model to replicas.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
dj_cqrs.dataclasses.TransportPayload
|
Transport payload from master model. |
required |
dj_cqrs.transport.KombuTransport
Transport class for Kombu.
clean_connection()
classmethod
Nothing to do here
consume(cqrs_ids = None)
classmethod
Receive data from master model.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cqrs_ids |
str
|
cqrs ids. |
None
|
produce(payload)
classmethod
Send data from master model to replicas.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
payload |
dj_cqrs.dataclasses.TransportPayload
|
Transport payload from master model. |
required |
dj_cqrs.constants.SignalType
Type of signal that generates this event.
DELETE = 'DELETE'
class-attribute
instance-attribute
The master model has been deleted.
SAVE = 'SAVE'
class-attribute
instance-attribute
The master model has been saved.
SYNC = 'SYNC'
class-attribute
instance-attribute
The master model needs syncronization.
dj_cqrs.dataclasses.TransportPayload
Transport message payload.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
signal_type |
dj_cqrs.constants.SignalType
|
Type of the signal for this message. |
required |
cqrs_id |
str
|
The unique CQRS identifier of the model. |
required |
instance_data |
dict
|
Serialized data of the instance that generates the event. |
required |
instance_pk |
str
|
Primary key of the instance. |
required |
queue |
str
|
Queue to synchronize, defaults to None. |
None
|
previous_data |
dict
|
Previous values for fields tracked for changes, defaults to None. |
None
|
correlation_id |
str
|
Correlation ID of process, where this payload is used. |
None
|
retries |
int
|
Current number of message retries. |
0
|
expires |
datetime
|
Message expiration datetime, infinite if None |
None
|
meta |
dict
|
Payload metadata |
None
|
from_message(dct)
classmethod
Builds payload from message data.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
dct |
dict
|
Deserialized message body data. |
required |
Returns:
Type | Description |
---|---|
TransportPayload
|
TransportPayload instance. |
is_expired()
Checks if this payload is expired.
Returns:
Type | Description |
---|---|
bool
|
True if payload is expired, False otherwise. |
to_dict() -> dict
Return the payload as a dictionary.
Returns:
Type | Description |
---|---|
dict
|
This payload. |
Registries
dj_cqrs.registries.MasterRegistry
register_model(model_cls)
classmethod
Registration of CQRS model identifiers.
get_model_by_cqrs_id(cqrs_id)
classmethod
Returns the model class given its CQRS_ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cqrs_id |
str
|
The CQRS_ID of the model to be retrieved. |
required |
Returns:
Type | Description |
---|---|
django.db.models.Model
|
The model that correspond to the given CQRS_ID or None if it has not been registered. |
dj_cqrs.registries.ReplicaRegistry
register_model(model_cls)
classmethod
Registration of CQRS model identifiers.
get_model_by_cqrs_id(cqrs_id)
classmethod
Returns the model class given its CQRS_ID.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
cqrs_id |
str
|
The CQRS_ID of the model to be retrieved. |
required |
Returns:
Type | Description |
---|---|
django.db.models.Model
|
The model that correspond to the given CQRS_ID or None if it has not been registered. |