PengZheng commented on a change in pull request #389:
URL: https://github.com/apache/celix/pull/389#discussion_r830771359



##########
File path: bundles/pubsub/pubsub_admin_udp/src/pubsub_udp_topic_sender.c
##########
@@ -0,0 +1,516 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include <pubsub_serializer.h>
+#include <stdlib.h>
+#include <stdint.h>
+#include <memory.h>
+#include <pubsub_constants.h>
+#include <pubsub/publisher.h>
+#include <utils.h>
+#include <zconf.h>
+#include <arpa/inet.h>
+#include <celix_log_helper.h>
+#include "pubsub_psa_udp_constants.h"
+#include "pubsub_udp_topic_sender.h"
+#include "pubsub_skt_handler.h"
+#include <uuid/uuid.h>
+#include "celix_constants.h"
+#include <pubsub_utils.h>
+#include "pubsub_interceptors_handler.h"
+
+#define UDP_BIND_MAX_RETRY                      10
+// Max message size is 64k - 8byte UDP header - 20byte IP header
+#define UDP_MAX_MSG_SIZE (((64 * 1024) - 1) - 28)
+
+#define L_DEBUG(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_DEBUG, __VA_ARGS__)
+#define L_INFO(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_INFO, __VA_ARGS__)
+#define L_WARN(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_WARNING, 
__VA_ARGS__)
+#define L_ERROR(...) \
+    celix_logHelper_log(sender->logHelper, CELIX_LOG_LEVEL_ERROR, __VA_ARGS__)
+
+struct pubsub_udp_topic_sender {
+    celix_bundle_context_t *ctx;
+    celix_log_helper_t *logHelper;
+    long protocolSvcId;
+    pubsub_protocol_service_t *protocol;
+    uuid_t fwUUID;
+    pubsub_sktHandler_t *socketHandler;
+    pubsub_sktHandler_t *sharedSocketHandler;
+    pubsub_interceptors_handler_t *interceptorsHandler;
+    pubsub_serializer_handler_t* serializerHandler;
+
+    void *admin;
+    char *scope;
+    char *topic;
+    char *url;
+    bool isStatic;
+    bool isPassive;
+    bool verbose;
+    unsigned long send_delay;
+    int seqNr; //atomic
+
+    struct {
+        long svcId;
+        celix_service_factory_t factory;
+    } publisher;
+
+    struct {
+        celix_thread_mutex_t mutex;
+        hash_map_t *map;  //key = bndId, value = 
psa_udp_bounded_service_entry_t
+    } boundedServices;
+};
+
+typedef struct psa_udp_bounded_service_entry {
+    pubsub_udp_topic_sender_t *parent;
+    pubsub_publisher_t service;
+    long bndId;
+    int getCount;
+} psa_udp_bounded_service_entry_t;
+
+static int psa_udp_localMsgTypeIdForMsgType(void *handle, const char *msgType, 
unsigned int *msgTypeId);
+
+static void *psa_udp_getPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
+                                         const celix_properties_t 
*svcProperties);
+
+static void psa_udp_ungetPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
+                                          const celix_properties_t 
*svcProperties);
+
+static void delay_first_send_for_late_joiners(pubsub_udp_topic_sender_t 
*sender);
+
+static int
+psa_udp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void 
*msg, celix_properties_t *metadata);
+
+pubsub_udp_topic_sender_t *pubsub_udpTopicSender_create(
+    celix_bundle_context_t *ctx,
+    celix_log_helper_t *logHelper,
+    const char *scope,
+    const char *topic,
+    pubsub_serializer_handler_t* serializerHandler,
+    void *admin,
+    const celix_properties_t *topicProperties,
+    pubsub_sktHandler_endPointStore_t *handlerStore,
+    long protocolSvcId,
+    pubsub_protocol_service_t *protocol) {
+    pubsub_udp_topic_sender_t *sender = calloc(1, sizeof(*sender));
+    sender->ctx = ctx;
+    sender->logHelper = logHelper;
+    sender->serializerHandler = serializerHandler;
+    sender->admin = admin;
+    sender->protocolSvcId = protocolSvcId;
+    sender->protocol = protocol;
+    const char *uuid = celix_bundleContext_getProperty(ctx, 
OSGI_FRAMEWORK_FRAMEWORK_UUID, NULL);
+    if (uuid != NULL) {
+        uuid_parse(uuid, sender->fwUUID);
+    }
+    sender->interceptorsHandler = pubsubInterceptorsHandler_create(ctx, scope, 
topic, PUBSUB_UDP_ADMIN_TYPE,
+                                                                   
pubsub_serializerHandler_getSerializationType(serializerHandler));
+    sender->isPassive = false;
+    char *urls = NULL;
+    const char *ip = celix_bundleContext_getProperty(ctx, 
PUBSUB_UDP_PSA_IP_KEY, NULL);
+    const char *discUrl = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_UDP_STATIC_BIND_URL_FOR, topic, scope);
+    const char *isPassive = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_UDP_PASSIVE_ENABLED, topic, scope);
+    const char *passiveKey = pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_UDP_PASSIVE_SELECTION_KEY, topic, scope);
+    const char *staticConnectUrls = 
pubsub_getEnvironmentVariableWithScopeTopic(ctx, 
PUBSUB_UDP_STATIC_CONNECT_URLS_FOR, topic, scope);
+
+    if (isPassive) {
+        sender->isPassive = pubsub_sktHandler_isPassive(isPassive);
+    }
+    if (topicProperties != NULL) {
+        if (discUrl == NULL) {
+            discUrl = celix_properties_get(topicProperties, 
PUBSUB_UDP_STATIC_DISCOVER_URL, NULL);
+        }
+        if (isPassive == NULL) {
+            sender->isPassive = celix_properties_getAsBool(topicProperties, 
PUBSUB_UDP_PASSIVE_CONFIGURED, false);
+        }
+        if (passiveKey == NULL) {
+            passiveKey = celix_properties_get(topicProperties, 
PUBSUB_UDP_PASSIVE_KEY, NULL);
+        }
+        if(staticConnectUrls == NULL) {
+            staticConnectUrls = celix_properties_get(topicProperties, 
PUBSUB_UDP_STATIC_CONNECT_URLS, NULL);
+        }
+    }
+    /* When it's an endpoint share the socket with the receiver */
+    if (passiveKey != NULL) {
+        celixThreadMutex_lock(&handlerStore->mutex);
+        pubsub_sktHandler_t *entry = hashMap_get(handlerStore->map, 
passiveKey);
+        if (entry == NULL) {
+            if (sender->socketHandler == NULL)
+                sender->socketHandler = 
pubsub_sktHandler_create(sender->protocol, sender->logHelper);
+            entry = sender->socketHandler;
+            sender->sharedSocketHandler = sender->socketHandler;
+            hashMap_put(handlerStore->map, (void *) passiveKey, entry);
+        } else {
+            sender->socketHandler = entry;
+            sender->sharedSocketHandler = entry;
+        }
+        celixThreadMutex_unlock(&handlerStore->mutex);
+    } else {
+        sender->socketHandler = pubsub_sktHandler_create(sender->protocol, 
sender->logHelper);
+    }
+
+    if ((sender->socketHandler != NULL) && (topicProperties != NULL)) {
+        long prio = celix_properties_getAsLong(topicProperties, 
PUBSUB_UDP_THREAD_REALTIME_PRIO, -1L);
+        const char *sched = celix_properties_get(topicProperties, 
PUBSUB_UDP_THREAD_REALTIME_SCHED, NULL);
+        long retryCnt = celix_properties_getAsLong(topicProperties, 
PUBSUB_UDP_PUBLISHER_RETRY_CNT_KEY, PUBSUB_UDP_PUBLISHER_RETRY_CNT_DEFAULT);
+        double sendTimeout = celix_properties_getAsDouble(topicProperties, 
PUBSUB_UDP_PUBLISHER_SNDTIMEO_KEY, PUBSUB_UDP_PUBLISHER_SNDTIMEO_DEFAULT);
+        long maxMsgSize = celix_properties_getAsLong(topicProperties, 
PSA_UDP_MAX_MESSAGE_SIZE, PSA_UDP_DEFAULT_MAX_MESSAGE_SIZE);
+        long timeout = celix_bundleContext_getPropertyAsLong(ctx, 
PSA_UDP_TIMEOUT, PSA_UDP_DEFAULT_TIMEOUT);
+        sender->send_delay = celix_bundleContext_getPropertyAsLong(ctx,  
PUBSUB_UTILS_PSA_SEND_DELAY, PUBSUB_UTILS_PSA_DEFAULT_SEND_DELAY);
+        maxMsgSize = MIN(UDP_MAX_MSG_SIZE, maxMsgSize);
+        pubsub_sktHandler_setThreadName(sender->socketHandler, topic, scope);
+        pubsub_sktHandler_setThreadPriority(sender->socketHandler, prio, 
sched);
+        pubsub_sktHandler_setSendRetryCnt(sender->socketHandler, (unsigned 
int) retryCnt);
+        pubsub_sktHandler_setSendTimeOut(sender->socketHandler, sendTimeout);
+        pubsub_sktHandler_setMaxMsgSize(sender->socketHandler, (unsigned int) 
maxMsgSize);
+        // When passiveKey is specified, enable receive event for full-duplex 
connection using key.
+        // Because the topic receiver is already started, enable the receive 
event.
+        pubsub_sktHandler_enableReceiveEvent(sender->socketHandler, 
(passiveKey) ? true : false);
+        pubsub_sktHandler_setTimeout(sender->socketHandler, (unsigned int) 
timeout);
+    }
+
+    if (!sender->isPassive) {
+        //setting up tcp socket for UDP TopicSender
+        if (discUrl != NULL) {
+            urls = celix_utils_strdup(discUrl);
+            sender->isStatic = true;
+        } else if (ip != NULL) {
+            urls = celix_utils_strdup(ip);
+        } else {
+            struct sockaddr_in *sin = pubsub_utils_url_getInAddr(NULL, 0);
+            urls = pubsub_utils_url_get_url(sin, NULL);
+            free(sin);
+        }
+        if (!sender->url && urls) {
+            char *urlsCopy = celix_utils_strdup(urls);
+            char *url;
+            char *save = urlsCopy;
+            while ((url = strtok_r(save, " ", &save))) {
+                int retry = 0;
+                pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+                bool is_multicast = 
pubsub_utils_url_is_multicast(urlInfo->hostname);
+                bool is_broadcast = 
pubsub_utils_url_is_broadcast(urlInfo->hostname);
+                pubsub_utils_url_free(urlInfo);
+                while ((is_multicast || is_broadcast) && url && retry < 
UDP_BIND_MAX_RETRY) {
+                    int rc = pubsub_sktHandler_udp_bind(sender->socketHandler, 
url);
+                    if (rc < 0) {
+                        L_WARN("Error for udp bind using dynamic bind url 
'%s'. %s", url, strerror(errno));
+                    } else {
+                        url = NULL;
+                    }
+                    retry++;
+                }
+            }
+            free(urlsCopy);
+            sender->url = 
pubsub_sktHandler_get_interface_url(sender->socketHandler);
+        }
+        free(urls);
+        if (staticConnectUrls){
+            char *urlsCopy = celix_utils_strdup(staticConnectUrls);
+            char *url;
+            char *save = urlsCopy;
+            while ((url = strtok_r(save, " ", &save))) {
+                if (url) {
+                    int rc = 0;
+                    pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(url);
+                    bool is_multicast = 
pubsub_utils_url_is_multicast(urlInfo->hostname);
+                    bool is_broadcast = 
pubsub_utils_url_is_broadcast(urlInfo->hostname);
+                    if (!is_multicast && !is_broadcast) {
+                        L_INFO("Udp static connect '%s'", url);
+                        rc = 
pubsub_sktHandler_udp_connect(sender->socketHandler, url);
+                    }
+                    if (rc < 0) {
+                        L_WARN("Error for udp static connect '%s'. %s", url, 
strerror(errno));
+                    }
+                    pubsub_utils_url_free(urlInfo);
+                }
+            }
+            free(urlsCopy);
+        }
+    }
+
+    //register publisher services using a service factory
+    {
+        sender->scope = scope == NULL ? NULL : strndup(scope, 1024 * 1024);
+        sender->topic = strndup(topic, 1024 * 1024);
+
+        celixThreadMutex_create(&sender->boundedServices.mutex, NULL);
+        sender->boundedServices.map = hashMap_create(NULL, NULL, NULL, NULL);
+
+        sender->publisher.factory.handle = sender;
+        sender->publisher.factory.getService = psa_udp_getPublisherService;
+        sender->publisher.factory.ungetService = psa_udp_ungetPublisherService;
+
+        celix_properties_t *props = celix_properties_create();
+        celix_properties_set(props, PUBSUB_PUBLISHER_TOPIC, sender->topic);
+        if (sender->scope != NULL) {
+            celix_properties_set(props, PUBSUB_PUBLISHER_SCOPE, sender->scope);
+        }
+
+        celix_service_registration_options_t opts = 
CELIX_EMPTY_SERVICE_REGISTRATION_OPTIONS;
+        opts.factory = &sender->publisher.factory;
+        opts.serviceName = PUBSUB_PUBLISHER_SERVICE_NAME;
+        opts.serviceVersion = PUBSUB_PUBLISHER_SERVICE_VERSION;
+        opts.properties = props;
+
+        sender->publisher.svcId = 
celix_bundleContext_registerServiceWithOptions(ctx, &opts);
+    }
+    return sender;
+}
+
+void pubsub_udpTopicSender_destroy(pubsub_udp_topic_sender_t *sender) {
+    if (sender != NULL) {
+
+        celix_bundleContext_unregisterService(sender->ctx, 
sender->publisher.svcId);
+
+        celixThreadMutex_lock(&sender->boundedServices.mutex);
+        hash_map_iterator_t iter = 
hashMapIterator_construct(sender->boundedServices.map);
+        while (hashMapIterator_hasNext(&iter)) {
+            psa_udp_bounded_service_entry_t *entry = 
hashMapIterator_nextValue(&iter);
+            free(entry);
+        }
+        hashMap_destroy(sender->boundedServices.map, false, false);
+        celixThreadMutex_unlock(&sender->boundedServices.mutex);
+        celixThreadMutex_destroy(&sender->boundedServices.mutex);
+
+        pubsubInterceptorsHandler_destroy(sender->interceptorsHandler);
+        if ((sender->socketHandler) && (sender->sharedSocketHandler == NULL)) {
+            pubsub_sktHandler_destroy(sender->socketHandler);
+            sender->socketHandler = NULL;
+        }
+
+        if (sender->scope != NULL) {
+            free(sender->scope);
+        }
+        free(sender->topic);
+        free(sender->url);
+        free(sender);
+    }
+}
+
+long pubsub_udpTopicSender_protocolSvcId(pubsub_udp_topic_sender_t *sender) {
+    return sender->protocolSvcId;
+}
+
+const char *pubsub_udpTopicSender_scope(pubsub_udp_topic_sender_t *sender) {
+    return sender->scope;
+}
+
+const char *pubsub_udpTopicSender_topic(pubsub_udp_topic_sender_t *sender) {
+    return sender->topic;
+}
+
+const char* pubsub_udpTopicSender_serializerType(pubsub_udp_topic_sender_t 
*sender) {
+    return 
pubsub_serializerHandler_getSerializationType(sender->serializerHandler);
+}
+
+const char *pubsub_udpTopicSender_url(pubsub_udp_topic_sender_t *sender) {
+    if (sender->isPassive) {
+        return pubsub_sktHandler_get_connection_url(sender->socketHandler);
+    } else {
+        return sender->url;
+    }
+}
+
+void pubsub_udpTopicSender_listConnections(pubsub_udp_topic_sender_t *sender, 
celix_array_list_t *urls) {
+    pubsub_sktHandler_get_connection_urls(sender->socketHandler, urls);
+}
+
+bool pubsub_udpTopicSender_isStatic(pubsub_udp_topic_sender_t *sender) {
+    return sender->isStatic;
+}
+
+bool pubsub_udpTopicSender_isPassive(pubsub_udp_topic_sender_t *sender) {
+    return sender->isPassive;
+}
+
+void pubsub_udpTopicSender_connectTo(pubsub_udp_topic_sender_t *sender, const 
char *url) {
+    if (!url) return;
+    char *connectUrl =  celix_utils_strdup(url);
+    pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(connectUrl);
+    bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+    bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+    pubsub_utils_url_free(urlInfo);
+
+    if (!is_multicast && !is_broadcast) {
+        L_DEBUG("[PSA_udp] TopicSender %s/%s connecting to udp url %s",
+                sender->scope == NULL ? "(null)" : sender->scope,
+                sender->topic,
+                url);
+        pubsub_sktHandler_udp_connect(sender->socketHandler, connectUrl);
+    }
+    free(connectUrl);
+}
+
+void pubsub_udpTopicSender_disconnectFrom(pubsub_udp_topic_sender_t *sender, 
const char *url) {
+    if (!url) return;
+    char *connectUrl =  celix_utils_strdup(url);
+    pubsub_utils_url_t *urlInfo = pubsub_utils_url_parse(connectUrl);
+    bool is_multicast = pubsub_utils_url_is_multicast(urlInfo->hostname);
+    bool is_broadcast = pubsub_utils_url_is_broadcast(urlInfo->hostname);
+    pubsub_utils_url_free(urlInfo);
+    if (!is_multicast && !is_broadcast) {
+        L_DEBUG("[PSA udp] TopicSender %s/%s disconnect from udp url %s",
+                sender->scope == NULL ? "(null)" : sender->scope,
+                sender->topic,
+                url);
+        pubsub_sktHandler_disconnect(sender->socketHandler, connectUrl);
+    }
+}
+
+static void *psa_udp_getPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
+                                         const celix_properties_t 
*svcProperties __attribute__((unused))) {
+    pubsub_udp_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_udp_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
+    if (entry != NULL) {
+        entry->getCount += 1;
+    } else {
+        entry = calloc(1, sizeof(*entry));
+        entry->getCount = 1;
+        entry->parent = sender;
+        entry->bndId = bndId;
+        entry->service.handle = entry;
+        entry->service.localMsgTypeIdForMsgType = 
psa_udp_localMsgTypeIdForMsgType;
+        entry->service.send = psa_udp_topicPublicationSend;
+        hashMap_put(sender->boundedServices.map, (void *) bndId, entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+    return &entry->service;
+}
+
+static void psa_udp_ungetPublisherService(void *handle, const celix_bundle_t 
*requestingBundle,
+                                          const celix_properties_t 
*svcProperties __attribute__((unused))) {
+    pubsub_udp_topic_sender_t *sender = handle;
+    long bndId = celix_bundle_getId(requestingBundle);
+
+    celixThreadMutex_lock(&sender->boundedServices.mutex);
+    psa_udp_bounded_service_entry_t *entry = 
hashMap_get(sender->boundedServices.map, (void *) bndId);
+    if (entry != NULL) {
+        entry->getCount -= 1;
+    }
+    if (entry != NULL && entry->getCount == 0) {
+        //free entry
+        hashMap_remove(sender->boundedServices.map, (void *) bndId);
+        free(entry);
+    }
+    celixThreadMutex_unlock(&sender->boundedServices.mutex);
+
+}
+
+static int
+psa_udp_topicPublicationSend(void *handle, unsigned int msgTypeId, const void 
*inMsg, celix_properties_t *metadata) {
+    psa_udp_bounded_service_entry_t *bound = handle;
+    pubsub_udp_topic_sender_t *sender = bound->parent;
+    const char* msgFqn;
+    int majorVersion;
+    int minorVersion;
+    celix_status_t status = 
pubsub_serializerHandler_getMsgInfo(sender->serializerHandler, msgTypeId, 
&msgFqn, &majorVersion, &minorVersion);
+
+    if (status != CELIX_SUCCESS) {
+        L_WARN("Cannot find serializer for msg id %u for serializer %s", 
msgTypeId,
+               
pubsub_serializerHandler_getSerializationType(sender->serializerHandler));

Review comment:
       Memory leak in psa_tcp is reported by CI and fixed by: 
https://github.com/apache/celix/pull/403




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to