xuzhenbao commented on issue #515:
URL: https://github.com/apache/celix/issues/515#issuecomment-2008626293
To remote event admin, I have the following ideas. First of all, the remote
distribution provider should be pluggable, and its interface definition is as
follows:
~~~
typedef struct celix_event_remote_provider_service {
void *handle;
//It is used to distribute the asynchronous event
celix_status_t (*postEvent)(const char *topic, const
celix_properties_t *properties);
//It is used to distribute the synchronous event
celix_status_t (*sendEvent)(const char *topic, const
celix_properties_t *properties);
}celix_event_remote_provider_service_t;
~~~
The service includes properties:
`celix.event.subscriber.endpoint.framework.uuids, service.ranking`, and event
admin selects the best `celix_event_remote_provider_service_t` service to
distribute events to remote based on these properties.
The selection strategy is as follows:
- If `celix.event.subscriber.endpoint.framework.uuids` are the same, then
select the one with the highest `service.ranking`.
- If `celix.event.subscriber.endpoint.framework.uuids` and
`service.ranking` are the same, then select the one that was registered first
(its `service.id` is smallest).
- If the set of celix.event.subscriber.endpoint.framework.uuids is a
containment relationship, then select the container.
- If the set of celix.event.subscriber.endpoint.framework.uuids is an
intersection relationship, then the event property
$celix.event.source.framework.uuid (used to identify the event source
framework) and $celix.event.source.seq (used to identify the event sequence)
should be set, and they are used for event deduplication.
In addition, the event properties retain the property name with the "$"
prefix. These properties are used to represent the specific information of the
remote distribution provider or control the API behavior. The properties that
control the API behavior include:
- `$celix.event.remote.qos`
- QOS behavior definition: consistent with MQTT. In addition, if the
remote distribution provider does not support the corresponding QOS level, it
can choose to degrade and record a log.
- QOS scope: Both the published event and the subscriber can set this
property. If there is a conflict between the two, then the smaller value is
taken.
- `$celix.event.remote.expiryInterval`: event expiration time, expired
events are no longer delivered to subscribers. If this value is not set, it
means never expire.
- `$celix.event.remote.enable`: If the event contains the property, the
event will be forwarded to the remote distribution provider. If this property
is not set, it means that the event is only published locally.
Note: The above event properties only take effect for remote events.
In addition, I don't intend to forward framework events to remote currently,
because I am not sure about the meaning of forwarding framework events to
remote. If it needs to be forwarded to remote, it should add the property
"org.osgi.framework.uuid".
I plan to implement the remote distribution provider in two ways, one is
based on RSA, and the other is based on MQTT. For the implementation based on
MQTT, I plan to use the mosquitto library to implement it.
**The implementation based on RSA**
For the implementation based on RSA, it will add the remote interface
`celix_event_handler_remote_listener_service_t` and
`celix_event_remote_subscriber_service_t`.
`celix_event_handler_remote_listener_service_t` is used to listen to remote
subscription information, and `celix_event_remote_subscriber_service_t` is used
to forward event to remote subscribers. For example, suppose there are two
processes A and B in the system, process B needs to subscribe to event E1, and
event E1 is published by process A, then process A needs to provide `c`
service, and process B needs to provide
`celix_event_remote_subscriber_service_t` service. Process B forwards its
subscription information to A by calling the
`celix_event_handler_remote_listener_service_t` service of process A, and
process A forwards E1 to process B by calling the
`celix_event_remote_subscriber_service_t` service of process B. Both
`celix_event_handler_remote_listener_service_t` and
`celix_event_remote_subscriber_service_t` ar
e called remotely.
**Note**:I don't consider attaching the subscription information of process
B to the `celix_event_remote_subscriber_service_t` service properties, because
if this is done, when updating the subscription information, process B needs to
re-register the `celix_event_remote_subscriber_service_t` service, during this
period, process B may lose some events published by process A. Therefore,
process B forwards its subscription information to A through the
`celix_event_handler_remote_listener_service_t` of process A.
The definition of `celix_event_handler_remote_listener_service_t` and
`celix_event_remote_subscriber_service_t` is as follows:
~~~
typedef struct celix_event_handler_remote_listener_service {
void *handle;
celix_status_t (*handlerAdded)(void* handle, const char* handlerFwUUID,
long handlerSvcId, const char* topics, const char* filter, int qos);
celix_status_t (*handlerRemoved)(void* handle, const char*
handlerFwUUID, long handlerSvcId, const char* topics, const char* filter);
}celix_event_handler_remote_listener_service_t;
typedef struct celix_event_remote_subscriber_service {
void* handle;
celix_status_t (*receiveEventAsync)(void* handle, const char* topic,
const celix_properties_t* properties);
celix_status_t (*receiveEventSync)(void* handle, const char* topic,
const celix_properties_t* properties);
}celix_event_remote_subscriber_service_t;
~~~
The component relationship diagram is as follows:

Some other key points to consider in the implementation:
- The `celix_properties_t` type will be added to dfi, which is used for the
serialization and deserialization of event properties in remote calls.
- The remote `celix_event_remote_subscriber_service_t` offline handling
strategy: If the event QOS value is QOS1 or QOS2, it should be delivered to
online services, and then wait for offline services to online or events to
expire or sessions to expire; QOS0 event are directly discarded.
- Backpressure handling: considering that large payload event is not
transmitted, we will discard the event when the cached event number reaches a
certain value(QOS0 events are discarded first). In addition, the remote
distribution provider will implement a circuit breaker mechanism for
problematic subscribers (Events will be not delivered for a period of time).
**The implementation based on MQTT**
For the implementation based on MQTT, I will add the remote interface
`celix_mqtt_broker_info_service_t`, which is used to obtain the address
information of the MQTT broker. The one of remote distribution provider obtains
the address information of the MQTT broker by parsing the mosquitto.conf file,
and then registers the remote service `celix_mqtt_broker_info_service`, other
remote distribution provider obtains the address information from
celix_mqtt_broker_info_service_t.
The definition of `celix_mqtt_broker_info_service` is as follows:
~~~
#define CELIX_MQTT_BROKER_INFO_SERVICE_NAME "celix_mqtt_broker_info_service"
//The address of the MQTT broker
#define CELIX_MQTT_BROKER_ADDRESSES "celix.mqtt.broker.addresses"
//The port of the MQTT broker
#define CELIX_MQTT_BROKER_PORT "celix.mqtt.broker.port"
typedef struct celix_mqtt_broker_info_service {
void *handle;
}celix_mqtt_broker_info_service_t;
~~~
The component relationship diagram is as follows:

Some other key points to consider in the implementation:
- Each provider publishes its own subscription information through the
message `celix/mqtt/subscriptions/<frameworkUUID>` to the broker, and obtains
remote subscription information by subscribing to the message
`celix/mqtt/subscriptions/*`. Then the provider should filter events based on
the remote subscription information before publishing the event to the broker.
- Network reconnection are automatically handled by the mosquitto library.
- Use the request/respond mechanism of MQTT to implement synchronous
semantics.
- For synchronous events, consider using a circuit breaker mechanism (no
longer deliver synchronous events to the corresponding framework for a period
of time). For asynchronous events, network exceptions are automatically handled
by the mosquitto library.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]