Messaging system in Caliopen
Caliopen relies on nats messaging system to communicate between services.
Most of the time, messages sent on nats are small json documents emitted by a service to trigger an action by another service. As a matter of fact, Caliopen needs a nats server up and running to work properly.
All nats modes may be used by services : asynchronous/synchronous, publish/subscribe, request/reply, etc. depending on use case.
List of services making use of nats system
- apiv2 : listen & emit
- mq-worker : listen only
- identities-worker : listen & emit
- smtp-bridge : listen & emit
- imap-bridge : listen & emit
- twitter-bridge : listen & emit
apiv2
function Notifications.SendEmailAdminToUser
:
- role: emitter
- mode:
request/reply
- topic:
outboundSMTP
- orders emitted:
"deliver"
- payload:
{
"message_id": "xxxxxxx",
"order": "deliver",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.SendDraft
:
- role: emitter
- mode:
request/reply
- topics:
outboundSMTP
,outboundIMAP
,twitter_dm
- orders emitted:
"deliver"
- payload:
{
"message_id": "xxxxxxx",
"order": "deliver",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.UpdateContact
- role: emitter
- mode:
publish/subscribe
- topics:
contactAction
- orders emitted:
"contact_update"
- payload:
{
"order": "contact_update",
"contact_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.launchKeyDiscovery
- role: emitter
- mode:
publish/subscribe
- topics:
keyAction
- orders emitted:
"discover_key"
- payload:
{
"order": "discover_key",
"contact_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.CreateContact
- role: emitter
- mode:
publish/subscribe
- topics:
contactAction
- orders emitted:
"contact_update"
- payload:
{
"order": "contact_update",
"contact_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.CreatePGPPubKey
- role: emitter
- mode:
publish/subscribe
- topics:
keyAction
- orders emitted:
"publish_key"
- payload:
{
"key_id": "xxxxxxx",
"order": "publish_key",
"resource_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.DeletePubKey
- role: emitter
- mode:
publish/subscribe
- topics:
keyAction
- orders emitted:
"delete_key"
- payload:
{
"key_id": "xxxxxxx",
"order": "delete_key",
"resource_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function REST.CreateUserIdentity
- role: emitter
- mode:
publish/subscribe
- topics:
identitiesWorker
- orders emitted:
"add_identity"
- payload:
{
"identity_id": "xxxxxxx",
"order": "add_identity",
"user_id": "xxxxxxx"
}
function REST.PatchUserIdentity
- role: emitter
- mode:
publish/subscribe
- topics:
identitiesWorker
- orders emitted:
"update_identity"
- payload:
{
"identity_id": "xxxxxxx",
"order": "update_identity",
"user_id": "xxxxxxx"
}
function REST.DeleteUserIdentity
- role: emitter
- mode:
publish/subscribe
- topics:
identitiesWorker
- orders emitted:
"delete_identity"
- payload:
{
"identity_id": "xxxxxxx",
"order": "delete_identity",
"user_id": "xxxxxxx"
}
twitter-bridge
function WorkerMsgHandler
:
- role: subscriber
- mode:
publish/subscribe
- topic:
twitter_worker
- queue:
twitterworkers
- orders handled :
"add_worker"
,"reload_worker"
,"remove_worker"
- payload:
{
"order": "xxxxxxx",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function DMmsgHandler
:
- role: subscriber/emitter
- mode:
publish/subscribe
- topic:
twitter_dm
- queue:
twitterworkers
- orders handled:
"sync"
,"deliver"
- payload:
{
"message_id": "xxxxxxx",
"order": "xxxxxxx",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
- topic:
identitiesWorker
- orders emitted:
"update_interval"
- payload:
{
"identity_id": "xxxxxxx",
"order": "update_interval",
"poll_intervall": "xxxxxxx",
"protocol": "twitter",
"user_id": "xxxxxxx"
}
function processInDM
:
- role: emitter
- mode :
request/reply
- topic :
inboundTwitter
- orders emitted:
"process_inbound"
- payload :
{
"message_id": "xxxxxxx",
"order": "process_inbound",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
imap-bridge
function natsMsgHandler
:
- role: subscriber/emitter
- mode:
publish/subscribe
- topic:
IMAPfetcher
- queue:
IMAPworkers
- orders handled:
"sync"
,"fullfetch"
- payload:
{
"order": "xxxxxxx",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
- topic:
identitiesWorker
- orders emitted:
"update_interval"
- payload:
{
"identity_id": "xxxxxxx",
"order": "update_interval",
"poll_intervall": "xxxxxxx",
"protocol": "imap",
"user_id": "xxxxxxx"
}
function natsMsgHandler
:
- role: subscriber
- mode:
request/reply
- topic:
outboundIMAP
- queue:
IMAPworkers
- orders handled:
"deliver"
- payload:
{
"message_id": "xxxxxxx",
"order": "deliver",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function WorkerMsgHandler
:
- role: subscriber
- mode:
publish/subscribe
- topic:
imap_worker
- queue:
IMAPworkers
- orders handled :
"add_worker"
,"reload_worker"
,"remove_worker"
- payload:
{
"order": "xxxxxxx",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
smtp-bridge
function natsMsgHandler
:
- role: subscriber
- mode:
request/reply
- topic:
outboundSMTP
- queue:
SMTPqueue
- orders handled:
"deliver"
- payload:
{
"message_id": "xxxxxxx",
"order": "deliver",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function processInbound
:
- role: emitter
- mode:
request/reply
- topic:
inboundSMTP
- orders emitted:
"process_inbound"
- payload:
{
"message_id": "xxxxxxx",
"order": "process_inbound",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
identities-worker
function imapJob.Run
:
- role: emitter
- mode:
publish/subscribe
- topic:
IMAPfetcher
- orders emitted:
"sync"
- payload:
{
"order": "sync",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function twitterJob.Run
:
- role: emitter
- mode:
publish/subscribe
- topic:
twitter_dm
- orders emitted:
"sync"
- payload:
{
"order": "sync",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
function natsOrdersHandler
:
- role: subscriber
- mode:
publish/subscribe
- topic:
identitiesWorker
- queue:
IDsworker
- orders handled:
"update_interval"
,"add_identity"
,"update_identity"
,"delete_identity"
- payload:
{
"identity_id": "xxxxxxx",
"order": "xxxxxxx",
"poll_intervall": "xxxxxxx",
"protocol": "xxxxxxx",
"user_id": "xxxxxxx"
}
mq-worker
class InboundEmail
:
- role: subscriber
- mode:
request/reply
- topic:
inboundSMTP
- queue:
SMTPqueue
- orders handled:
process_inbound
- payload:
{
"message_id": "xxxxxxx",
"order": "process_inbound",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
class InboundTwitter
:
- role: subscriber
- mode:
request/reply
- topic:
inboundTwitter
- queue:
Twitterqueue
- orders handled:
process_inbound
- payload:
{
"message_id": "xxxxxxx",
"order": "process_inbound",
"remote_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
class ContactAction
:
- role: subscriber
- mode:
publish/subscribe
- topic:
contactAction
- queue:
contactQueue
- orders handled:
contact_update
- payload:
{
"order": "contact_update",
"contact_id": "xxxxxxx",
"user_id": "xxxxxxx"
}
class KeyAction
:
- role: subscriber
- mode:
publish/subscribe
- topic:
keyAction
- queue:
keyQueue
- orders handled:
discover_key
- payload:
{
"order": "discover_key",
"contact_id": "xxxxxxx",
"user_id": "xxxxxxx"
}