This is an automated email from the ASF dual-hosted git repository.

sijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new b982b5d  add producer flush method in cpp client (#3020)
b982b5d is described below

commit b982b5dc9b631b28b017c257174211e7058361ab
Author: Jia Zhai <jiaz...@users.noreply.github.com>
AuthorDate: Tue Nov 20 16:11:06 2018 +0800

    add producer flush method in cpp client (#3020)
    
    ### Motivation
    
    We already have flush() method in java api: 
http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Producer.html#flush--
    It is better to provide a same method for cpp client.
    
    ### Modifications
    
    Add related methods in cpp client.
    Add related tests.
    
    ### Result
    
    Cpp unit tests pass.
---
 pulsar-client-cpp/include/pulsar/Producer.h      |  15 ++
 pulsar-client-cpp/lib/BatchMessageContainer.cc   |  22 ++-
 pulsar-client-cpp/lib/BatchMessageContainer.h    |   4 +-
 pulsar-client-cpp/lib/PartitionedProducerImpl.cc |  33 ++++
 pulsar-client-cpp/lib/PartitionedProducerImpl.h  |   5 +
 pulsar-client-cpp/lib/Producer.cc                |  18 +++
 pulsar-client-cpp/lib/ProducerImpl.cc            |  41 ++++-
 pulsar-client-cpp/lib/ProducerImpl.h             |   4 +
 pulsar-client-cpp/lib/ProducerImplBase.h         |   1 +
 pulsar-client-cpp/tests/BasicEndToEndTest.cc     | 189 +++++++++++++++++++++++
 10 files changed, 320 insertions(+), 12 deletions(-)

diff --git a/pulsar-client-cpp/include/pulsar/Producer.h 
b/pulsar-client-cpp/include/pulsar/Producer.h
index 1955c7f..407d937 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -29,6 +29,9 @@ namespace pulsar {
 class ProducerImplBase;
 class PulsarWrapper;
 class PulsarFriend;
+
+typedef boost::function<void(Result)> FlushCallback;
+
 class Producer {
    public:
     /**
@@ -81,6 +84,18 @@ class Producer {
     void sendAsync(const Message& msg, SendCallback callback);
 
     /**
+     * Flush all the messages buffered in the client and wait until all 
messages have been successfully
+     * persisted.
+     */
+    Result flush();
+
+    /**
+     * Flush all the messages buffered in the client and wait until all 
messages have been successfully
+     * persisted.
+     */
+    void flushAsync(FlushCallback callback);
+
+    /**
      * Get the last sequence id that was published by this producer.
      *
      * This represent either the automatically assigned or custom sequence id 
(set on the MessageBuilder) that
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc 
b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 0986887..ec8b489 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -47,7 +47,7 @@ void BatchMessageContainer::add(const Message& msg, 
SendCallback sendCallback, b
                     << "]");
     if (!(disableCheck || hasSpaceInBatch(msg))) {
         LOG_DEBUG(*this << " Batch is full");
-        sendMessage();
+        sendMessage(NULL);
         add(msg, sendCallback, true);
         return;
     }
@@ -71,7 +71,7 @@ void BatchMessageContainer::add(const Message& msg, 
SendCallback sendCallback, b
     LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
     if (isFull()) {
         LOG_DEBUG(*this << " Batch is full.");
-        sendMessage();
+        sendMessage(NULL);
     }
 }
 
@@ -83,11 +83,14 @@ void BatchMessageContainer::startTimer() {
                                    boost::asio::placeholders::error));
 }
 
-void BatchMessageContainer::sendMessage() {
+void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
     // Call this function after acquiring the ProducerImpl lock
     LOG_DEBUG(*this << "Sending the batch message container");
     if (isEmpty()) {
         LOG_DEBUG(*this << " Batch is empty - returning.");
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
         return;
     }
     
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
@@ -101,8 +104,8 @@ void BatchMessageContainer::sendMessage() {
     msg.impl_ = impl_;
 
     // bind keeps a copy of the parameters
-    SendCallback callback =
-        boost::bind(&BatchMessageContainer::batchMessageCallBack, _1, 
messagesContainerListPtr_);
+    SendCallback callback = 
boost::bind(&BatchMessageContainer::batchMessageCallBack, _1,
+                                        messagesContainerListPtr_, 
flushCallback);
 
     producer_.sendMessage(msg, callback);
     clear();
@@ -131,8 +134,12 @@ void BatchMessageContainer::clear() {
     batchSizeInBytes_ = 0;
 }
 
-void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListPtr messagesContainerListPtr) {
+void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListPtr messagesContainerListPtr,
+                                                 FlushCallback flushCallback) {
     if (!messagesContainerListPtr) {
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
         return;
     }
     LOG_DEBUG("BatchMessageContainer::batchMessageCallBack called with [Result 
= "
@@ -142,6 +149,9 @@ void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListP
         // callback(result, message)
         iter->sendCallback_(r, iter->message_);
     }
+    if (flushCallback) {
+        flushCallback(ResultOk);
+    }
 }
 
 BatchMessageContainer::~BatchMessageContainer() {
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h 
b/pulsar-client-cpp/lib/BatchMessageContainer.h
index 0b7a0d3..37b52ef 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -64,7 +64,7 @@ class BatchMessageContainer {
 
     void clear();
 
-    static void batchMessageCallBack(Result r, MessageContainerListPtr 
messages);
+    static void batchMessageCallBack(Result r, MessageContainerListPtr 
messages, FlushCallback callback);
 
     friend inline std::ostream& operator<<(std::ostream& os,
                                            const BatchMessageContainer& 
batchMessageContainer);
@@ -108,7 +108,7 @@ class BatchMessageContainer {
 
     void startTimer();
 
-    void sendMessage();
+    void sendMessage(FlushCallback callback);
 };
 
 bool BatchMessageContainer::hasSpaceInBatch(const Message& msg) const {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 4e75e0e..0f2780e 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -241,4 +241,37 @@ void PartitionedProducerImpl::triggerFlush() {
     }
 }
 
+void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
+    if (!flushPromise_ || flushPromise_->isComplete()) {
+        flushPromise_ = boost::make_shared<Promise<Result, bool_type>>();
+    } else {
+        // already in flushing, register a listener callback
+        boost::function<void(Result, bool)> listenerCallback = [this, 
callback](Result result, bool_type v) {
+            if (v) {
+                callback(ResultOk);
+            } else {
+                callback(ResultUnknownError);
+            }
+            return;
+        };
+
+        flushPromise_->getFuture().addListener(listenerCallback);
+        return;
+    }
+
+    FlushCallback subFlushCallback = [this, callback](Result result) {
+        int previous = flushedPartitions_.fetch_add(1);
+        if (previous == producers_.size() - 1) {
+            flushedPartitions_.store(0);
+            flushPromise_->setValue(true);
+            callback(result);
+        }
+        return;
+    };
+
+    for (ProducerList::const_iterator prod = producers_.begin(); prod != 
producers_.end(); prod++) {
+        (*prod)->flushAsync(subFlushCallback);
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index cf6ddd9..0a3d949 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -71,6 +71,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     virtual void triggerFlush();
 
+    virtual void flushAsync(FlushCallback callback);
+
     void handleSinglePartitionProducerCreated(Result result, 
ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                               const unsigned int 
partitionIndex);
 
@@ -115,6 +117,9 @@ class PartitionedProducerImpl : public ProducerImplBase,
     Promise<Result, ProducerImplBaseWeakPtr> 
partitionedProducerCreatedPromise_;
 
     MessageRoutingPolicyPtr getMessageRouter();
+
+    std::atomic<int> flushedPartitions_;
+    boost::shared_ptr<Promise<Result, bool_type>> flushPromise_;
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Producer.cc 
b/pulsar-client-cpp/lib/Producer.cc
index 49adbfc..bee213e 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -77,4 +77,22 @@ void Producer::closeAsync(CloseCallback callback) {
 
     impl_->closeAsync(callback);
 }
+
+Result Producer::flush() {
+    Promise<bool, Result> promise;
+    flushAsync(WaitForCallback(promise));
+
+    Result result;
+    promise.getFuture().get(result);
+    return result;
+}
+
+void Producer::flushAsync(FlushCallback callback) {
+    if (!impl_) {
+        callback(ResultProducerNotInitialized);
+        return;
+    }
+
+    impl_->flushAsync(callback);
+}
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index ddcf671..32f178c 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -245,7 +245,7 @@ void ProducerImpl::failPendingMessages(Result result) {
     }
 
     // this function can handle null pointer
-    BatchMessageContainer::batchMessageCallBack(ResultTimeout, 
messageContainerListPtr);
+    BatchMessageContainer::batchMessageCallBack(ResultTimeout, 
messageContainerListPtr, NULL);
 }
 
 void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
@@ -285,10 +285,43 @@ void ProducerImpl::statsCallBackHandler(Result res, const 
Message& msg, SendCall
     }
 }
 
+void ProducerImpl::flushAsync(FlushCallback callback) {
+    if (batchMessageContainer) {
+        if (!flushPromise_ || flushPromise_->isComplete()) {
+            flushPromise_ = boost::make_shared<Promise<Result, bool_type>>();
+        } else {
+            // already in flushing, register a listener callback
+            boost::function<void(Result, bool)> listenerCallback = [this, 
callback](Result result,
+                                                                               
     bool_type v) {
+                if (v) {
+                    callback(ResultOk);
+                } else {
+                    callback(ResultUnknownError);
+                }
+                return;
+            };
+
+            flushPromise_->getFuture().addListener(listenerCallback);
+            return;
+        }
+
+        FlushCallback innerCallback = [this, callback](Result result) {
+            flushPromise_->setValue(true);
+            callback(result);
+            return;
+        };
+
+        Lock lock(mutex_);
+        batchMessageContainer->sendMessage(innerCallback);
+    } else {
+        callback(ResultOk);
+    }
+}
+
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer) {
         Lock lock(mutex_);
-        batchMessageContainer->sendMessage();
+        batchMessageContainer->sendMessage(NULL);
     }
 }
 
@@ -364,7 +397,7 @@ void ProducerImpl::sendAsync(const Message& msg, 
SendCallback callback) {
         // If queue is full sending the batch immediately, no point waiting 
till batchMessagetimeout
         if (batchMessageContainer) {
             LOG_DEBUG(getName() << " - sending batch message immediately");
-            batchMessageContainer->sendMessage();
+            batchMessageContainer->sendMessage(NULL);
         }
         lock.unlock();
         cb(ResultProducerQueueIsFull, msg);
@@ -412,7 +445,7 @@ void ProducerImpl::batchMessageTimeoutHandler(const 
boost::system::error_code& e
     }
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
     Lock lock(mutex_);
-    batchMessageContainer->sendMessage();
+    batchMessageContainer->sendMessage(NULL);
 }
 
 void ProducerImpl::printStats() {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index 615eafb..aa22abc 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -34,6 +34,7 @@
 using namespace pulsar;
 
 namespace pulsar {
+typedef bool bool_type;
 
 class BatchMessageContainer;
 
@@ -90,6 +91,8 @@ class ProducerImpl : public HandlerBase,
 
     virtual void triggerFlush();
 
+    virtual void flushAsync(FlushCallback callback);
+
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
 
@@ -156,6 +159,7 @@ class ProducerImpl : public HandlerBase,
     MessageCryptoPtr msgCrypto_;
     DeadlineTimerPtr dataKeyGenTImer_;
     uint32_t dataKeyGenIntervalSec_;
+    boost::shared_ptr<Promise<Result, bool_type>> flushPromise_;
 };
 
 struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h 
b/pulsar-client-cpp/lib/ProducerImplBase.h
index 92f886f..3dd92a4 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -43,6 +43,7 @@ class ProducerImplBase {
     virtual const std::string& getTopic() const = 0;
     virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
= 0;
     virtual void triggerFlush() = 0;
+    virtual void flushAsync(FlushCallback callback) = 0;
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d264ab1..c5f4420 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2190,3 +2190,192 @@ TEST(BasicEndToEndTest, testGetTopicPartitions) {
 
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testFlushInProducer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://property/cluster/namespace/test-flush-in-producer";
+    std::string subName = "subscription-name";
+    Producer producer;
+    int numOfMessages = 10;
+
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    conf.setBatchingMaxMessages(numOfMessages);
+    conf.setBatchingMaxPublishDelayMs(60000);
+
+    conf.setBlockIfQueueFull(true);
+    conf.setProperty("producer-name", "test-producer-name");
+    conf.setProperty("producer-id", "test-producer-id");
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // Send Asynchronously of half the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending half of messages in async, should timeout to receive");
+
+    // message not reached max batch number, should not receive any data.
+    Message receivedMsg;
+    ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 2000));
+
+    // After flush, it should get the message
+    producer.flush();
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 2000));
+
+    // receive all the messages.
+    while (consumer.receive(receivedMsg, 2000) == ResultOk) {
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+
+    // Send Asynchronously of another round of the messages
+    for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO(
+        "sending the other half messages in async, should still timeout, since 
first half already flushed");
+    ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 2000));
+
+    // After flush async, it should get the message
+    Promise<bool, Result> promise;
+    producer.flushAsync(WaitForCallback(promise));
+    Promise<bool, Result> promise1;
+    producer.flushAsync(WaitForCallback(promise1));
+    promise.getFuture().get(result);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 2000));
+
+    producer.close();
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://prop/unit/ns/partition-testFlushInPartitionedProducer";
+    // call admin api to make it partitioned
+    std::string url =
+        adminUrl + 
"admin/persistent/prop/unit/ns/partition-testFlushInPartitionedProducer/partitions";
+    int res = makePutRequest(url, "5");
+    int numberOfPartitions = 5;
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer;
+    int numOfMessages = 10;
+    ProducerConfiguration tempProducerConfiguration;
+    
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ProducerConfiguration producerConfiguration = tempProducerConfiguration;
+    producerConfiguration.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    producerConfiguration.setBatchingMaxMessages(numOfMessages / 
numberOfPartitions);
+    producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+    Result result = client.createProducer(topicName, producerConfiguration, 
producer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(producer.getTopic(), topicName);
+
+    LOG_INFO("Creating Subscriber");
+    std::string consumerId = "CONSUMER";
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerExclusive);
+    consConfig.setReceiverQueueSize(2);
+    ASSERT_FALSE(consConfig.hasMessageListener());
+    Consumer consumer[numberOfPartitions];
+    Result subscribeResult;
+    for (int i = 0; i < numberOfPartitions; i++) {
+        std::stringstream partitionedTopicName;
+        partitionedTopicName << topicName << "-partition-" << i;
+
+        std::stringstream partitionedConsumerId;
+        partitionedConsumerId << consumerId << i;
+        subscribeResult = client.subscribe(partitionedTopicName.str(), 
partitionedConsumerId.str(),
+                                           consConfig, consumer[i]);
+
+        ASSERT_EQ(ResultOk, subscribeResult);
+        ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str());
+    }
+
+    // Send asynchronously of first part the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+
+    LOG_INFO("sending first part messages in async, should timeout to 
receive");
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 2000));
+
+    // After flush, should be able to consume.
+    producer.flush();
+    LOG_INFO("After flush, should be able to receive");
+    ASSERT_EQ(ResultOk, consumer[0].receive(m, 2000));
+
+    LOG_INFO("Receive all messages.");
+    // receive all the messages.
+    for (int partitionIndex = 0; partitionIndex < numberOfPartitions; 
partitionIndex++) {
+        while (consumer[partitionIndex].receive(m, 2000) == ResultOk) {
+            // ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+            ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+        }
+    }
+
+    // send message again.
+    for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+
+    // After flush async, it should get the message
+    Promise<bool, Result> promise;
+    producer.flushAsync(WaitForCallback(promise));
+    Promise<bool, Result> promise1;
+    producer.flushAsync(WaitForCallback(promise1));
+    promise.getFuture().get(result);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(ResultOk, consumer[0].receive(m, 2000));
+
+    producer.close();
+    client.shutdown();
+}

Reply via email to