sijie closed pull request #3020: Issue #2984: Add producer flush method in cpp 
client
URL: https://github.com/apache/pulsar/pull/3020
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-client-cpp/include/pulsar/Producer.h 
b/pulsar-client-cpp/include/pulsar/Producer.h
index 1955c7f4f3..407d937043 100644
--- a/pulsar-client-cpp/include/pulsar/Producer.h
+++ b/pulsar-client-cpp/include/pulsar/Producer.h
@@ -29,6 +29,9 @@ namespace pulsar {
 class ProducerImplBase;
 class PulsarWrapper;
 class PulsarFriend;
+
+typedef boost::function<void(Result)> FlushCallback;
+
 class Producer {
    public:
     /**
@@ -80,6 +83,18 @@ class Producer {
      */
     void sendAsync(const Message& msg, SendCallback callback);
 
+    /**
+     * Flush all the messages buffered in the client and wait until all 
messages have been successfully
+     * persisted.
+     */
+    Result flush();
+
+    /**
+     * Flush all the messages buffered in the client and wait until all 
messages have been successfully
+     * persisted.
+     */
+    void flushAsync(FlushCallback callback);
+
     /**
      * Get the last sequence id that was published by this producer.
      *
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc 
b/pulsar-client-cpp/lib/BatchMessageContainer.cc
index 0986887030..ec8b4899b9 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.cc
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc
@@ -47,7 +47,7 @@ void BatchMessageContainer::add(const Message& msg, 
SendCallback sendCallback, b
                     << "]");
     if (!(disableCheck || hasSpaceInBatch(msg))) {
         LOG_DEBUG(*this << " Batch is full");
-        sendMessage();
+        sendMessage(NULL);
         add(msg, sendCallback, true);
         return;
     }
@@ -71,7 +71,7 @@ void BatchMessageContainer::add(const Message& msg, 
SendCallback sendCallback, b
     LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_);
     if (isFull()) {
         LOG_DEBUG(*this << " Batch is full.");
-        sendMessage();
+        sendMessage(NULL);
     }
 }
 
@@ -83,11 +83,14 @@ void BatchMessageContainer::startTimer() {
                                    boost::asio::placeholders::error));
 }
 
-void BatchMessageContainer::sendMessage() {
+void BatchMessageContainer::sendMessage(FlushCallback flushCallback) {
     // Call this function after acquiring the ProducerImpl lock
     LOG_DEBUG(*this << "Sending the batch message container");
     if (isEmpty()) {
         LOG_DEBUG(*this << " Batch is empty - returning.");
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
         return;
     }
     
impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size());
@@ -101,8 +104,8 @@ void BatchMessageContainer::sendMessage() {
     msg.impl_ = impl_;
 
     // bind keeps a copy of the parameters
-    SendCallback callback =
-        boost::bind(&BatchMessageContainer::batchMessageCallBack, _1, 
messagesContainerListPtr_);
+    SendCallback callback = 
boost::bind(&BatchMessageContainer::batchMessageCallBack, _1,
+                                        messagesContainerListPtr_, 
flushCallback);
 
     producer_.sendMessage(msg, callback);
     clear();
@@ -131,8 +134,12 @@ void BatchMessageContainer::clear() {
     batchSizeInBytes_ = 0;
 }
 
-void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListPtr messagesContainerListPtr) {
+void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListPtr messagesContainerListPtr,
+                                                 FlushCallback flushCallback) {
     if (!messagesContainerListPtr) {
+        if (flushCallback) {
+            flushCallback(ResultOk);
+        }
         return;
     }
     LOG_DEBUG("BatchMessageContainer::batchMessageCallBack called with [Result 
= "
@@ -142,6 +149,9 @@ void BatchMessageContainer::batchMessageCallBack(Result r, 
MessageContainerListP
         // callback(result, message)
         iter->sendCallback_(r, iter->message_);
     }
+    if (flushCallback) {
+        flushCallback(ResultOk);
+    }
 }
 
 BatchMessageContainer::~BatchMessageContainer() {
diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h 
b/pulsar-client-cpp/lib/BatchMessageContainer.h
index 0b7a0d36f1..37b52efb1f 100644
--- a/pulsar-client-cpp/lib/BatchMessageContainer.h
+++ b/pulsar-client-cpp/lib/BatchMessageContainer.h
@@ -64,7 +64,7 @@ class BatchMessageContainer {
 
     void clear();
 
-    static void batchMessageCallBack(Result r, MessageContainerListPtr 
messages);
+    static void batchMessageCallBack(Result r, MessageContainerListPtr 
messages, FlushCallback callback);
 
     friend inline std::ostream& operator<<(std::ostream& os,
                                            const BatchMessageContainer& 
batchMessageContainer);
@@ -108,7 +108,7 @@ class BatchMessageContainer {
 
     void startTimer();
 
-    void sendMessage();
+    void sendMessage(FlushCallback callback);
 };
 
 bool BatchMessageContainer::hasSpaceInBatch(const Message& msg) const {
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
index 4e75e0e562..0f2780e20d 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc
@@ -241,4 +241,37 @@ void PartitionedProducerImpl::triggerFlush() {
     }
 }
 
+void PartitionedProducerImpl::flushAsync(FlushCallback callback) {
+    if (!flushPromise_ || flushPromise_->isComplete()) {
+        flushPromise_ = boost::make_shared<Promise<Result, bool_type>>();
+    } else {
+        // already in flushing, register a listener callback
+        boost::function<void(Result, bool)> listenerCallback = [this, 
callback](Result result, bool_type v) {
+            if (v) {
+                callback(ResultOk);
+            } else {
+                callback(ResultUnknownError);
+            }
+            return;
+        };
+
+        flushPromise_->getFuture().addListener(listenerCallback);
+        return;
+    }
+
+    FlushCallback subFlushCallback = [this, callback](Result result) {
+        int previous = flushedPartitions_.fetch_add(1);
+        if (previous == producers_.size() - 1) {
+            flushedPartitions_.store(0);
+            flushPromise_->setValue(true);
+            callback(result);
+        }
+        return;
+    };
+
+    for (ProducerList::const_iterator prod = producers_.begin(); prod != 
producers_.end(); prod++) {
+        (*prod)->flushAsync(subFlushCallback);
+    }
+}
+
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h 
b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
index cf6ddd9a11..0a3d949fba 100644
--- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h
+++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h
@@ -71,6 +71,8 @@ class PartitionedProducerImpl : public ProducerImplBase,
 
     virtual void triggerFlush();
 
+    virtual void flushAsync(FlushCallback callback);
+
     void handleSinglePartitionProducerCreated(Result result, 
ProducerImplBaseWeakPtr producerBaseWeakPtr,
                                               const unsigned int 
partitionIndex);
 
@@ -115,6 +117,9 @@ class PartitionedProducerImpl : public ProducerImplBase,
     Promise<Result, ProducerImplBaseWeakPtr> 
partitionedProducerCreatedPromise_;
 
     MessageRoutingPolicyPtr getMessageRouter();
+
+    std::atomic<int> flushedPartitions_;
+    boost::shared_ptr<Promise<Result, bool_type>> flushPromise_;
 };
 
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/Producer.cc 
b/pulsar-client-cpp/lib/Producer.cc
index 49adbfc166..bee213e6f9 100644
--- a/pulsar-client-cpp/lib/Producer.cc
+++ b/pulsar-client-cpp/lib/Producer.cc
@@ -77,4 +77,22 @@ void Producer::closeAsync(CloseCallback callback) {
 
     impl_->closeAsync(callback);
 }
+
+Result Producer::flush() {
+    Promise<bool, Result> promise;
+    flushAsync(WaitForCallback(promise));
+
+    Result result;
+    promise.getFuture().get(result);
+    return result;
+}
+
+void Producer::flushAsync(FlushCallback callback) {
+    if (!impl_) {
+        callback(ResultProducerNotInitialized);
+        return;
+    }
+
+    impl_->flushAsync(callback);
+}
 }  // namespace pulsar
diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc 
b/pulsar-client-cpp/lib/ProducerImpl.cc
index ddcf671baa..32f178cf36 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.cc
+++ b/pulsar-client-cpp/lib/ProducerImpl.cc
@@ -245,7 +245,7 @@ void ProducerImpl::failPendingMessages(Result result) {
     }
 
     // this function can handle null pointer
-    BatchMessageContainer::batchMessageCallBack(ResultTimeout, 
messageContainerListPtr);
+    BatchMessageContainer::batchMessageCallBack(ResultTimeout, 
messageContainerListPtr, NULL);
 }
 
 void ProducerImpl::resendMessages(ClientConnectionPtr cnx) {
@@ -285,10 +285,43 @@ void ProducerImpl::statsCallBackHandler(Result res, const 
Message& msg, SendCall
     }
 }
 
+void ProducerImpl::flushAsync(FlushCallback callback) {
+    if (batchMessageContainer) {
+        if (!flushPromise_ || flushPromise_->isComplete()) {
+            flushPromise_ = boost::make_shared<Promise<Result, bool_type>>();
+        } else {
+            // already in flushing, register a listener callback
+            boost::function<void(Result, bool)> listenerCallback = [this, 
callback](Result result,
+                                                                               
     bool_type v) {
+                if (v) {
+                    callback(ResultOk);
+                } else {
+                    callback(ResultUnknownError);
+                }
+                return;
+            };
+
+            flushPromise_->getFuture().addListener(listenerCallback);
+            return;
+        }
+
+        FlushCallback innerCallback = [this, callback](Result result) {
+            flushPromise_->setValue(true);
+            callback(result);
+            return;
+        };
+
+        Lock lock(mutex_);
+        batchMessageContainer->sendMessage(innerCallback);
+    } else {
+        callback(ResultOk);
+    }
+}
+
 void ProducerImpl::triggerFlush() {
     if (batchMessageContainer) {
         Lock lock(mutex_);
-        batchMessageContainer->sendMessage();
+        batchMessageContainer->sendMessage(NULL);
     }
 }
 
@@ -364,7 +397,7 @@ void ProducerImpl::sendAsync(const Message& msg, 
SendCallback callback) {
         // If queue is full sending the batch immediately, no point waiting 
till batchMessagetimeout
         if (batchMessageContainer) {
             LOG_DEBUG(getName() << " - sending batch message immediately");
-            batchMessageContainer->sendMessage();
+            batchMessageContainer->sendMessage(NULL);
         }
         lock.unlock();
         cb(ResultProducerQueueIsFull, msg);
@@ -412,7 +445,7 @@ void ProducerImpl::batchMessageTimeoutHandler(const 
boost::system::error_code& e
     }
     LOG_DEBUG(getName() << " - Batch Message Timer expired");
     Lock lock(mutex_);
-    batchMessageContainer->sendMessage();
+    batchMessageContainer->sendMessage(NULL);
 }
 
 void ProducerImpl::printStats() {
diff --git a/pulsar-client-cpp/lib/ProducerImpl.h 
b/pulsar-client-cpp/lib/ProducerImpl.h
index 615eafb7af..aa22abc668 100644
--- a/pulsar-client-cpp/lib/ProducerImpl.h
+++ b/pulsar-client-cpp/lib/ProducerImpl.h
@@ -34,6 +34,7 @@
 using namespace pulsar;
 
 namespace pulsar {
+typedef bool bool_type;
 
 class BatchMessageContainer;
 
@@ -90,6 +91,8 @@ class ProducerImpl : public HandlerBase,
 
     virtual void triggerFlush();
 
+    virtual void flushAsync(FlushCallback callback);
+
    protected:
     ProducerStatsBasePtr producerStatsBasePtr_;
 
@@ -156,6 +159,7 @@ class ProducerImpl : public HandlerBase,
     MessageCryptoPtr msgCrypto_;
     DeadlineTimerPtr dataKeyGenTImer_;
     uint32_t dataKeyGenIntervalSec_;
+    boost::shared_ptr<Promise<Result, bool_type>> flushPromise_;
 };
 
 struct ProducerImplCmp {
diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h 
b/pulsar-client-cpp/lib/ProducerImplBase.h
index 92f886f563..3dd92a4618 100644
--- a/pulsar-client-cpp/lib/ProducerImplBase.h
+++ b/pulsar-client-cpp/lib/ProducerImplBase.h
@@ -43,6 +43,7 @@ class ProducerImplBase {
     virtual const std::string& getTopic() const = 0;
     virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() 
= 0;
     virtual void triggerFlush() = 0;
+    virtual void flushAsync(FlushCallback callback) = 0;
 };
 }  // namespace pulsar
 #endif  // PULSAR_PRODUCER_IMPL_BASE_HEADER
diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc 
b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
index d264ab129c..c5f4420632 100644
--- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc
+++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc
@@ -2190,3 +2190,192 @@ TEST(BasicEndToEndTest, testGetTopicPartitions) {
 
     client.shutdown();
 }
+
+TEST(BasicEndToEndTest, testFlushInProducer) {
+    ClientConfiguration config;
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://property/cluster/namespace/test-flush-in-producer";
+    std::string subName = "subscription-name";
+    Producer producer;
+    int numOfMessages = 10;
+
+    ProducerConfiguration conf;
+    conf.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    conf.setBatchingMaxMessages(numOfMessages);
+    conf.setBatchingMaxPublishDelayMs(60000);
+
+    conf.setBlockIfQueueFull(true);
+    conf.setProperty("producer-name", "test-producer-name");
+    conf.setProperty("producer-id", "test-producer-id");
+
+    Promise<Result, Producer> producerPromise;
+    client.createProducerAsync(topicName, conf, 
WaitForCallbackValue<Producer>(producerPromise));
+    Future<Result, Producer> producerFuture = producerPromise.getFuture();
+    Result result = producerFuture.get(producer);
+    ASSERT_EQ(ResultOk, result);
+
+    Consumer consumer;
+    ConsumerConfiguration consumerConfig;
+    consumerConfig.setProperty("consumer-name", "test-consumer-name");
+    consumerConfig.setProperty("consumer-id", "test-consumer-id");
+    Promise<Result, Consumer> consumerPromise;
+    client.subscribeAsync(topicName, subName, consumerConfig,
+                          WaitForCallbackValue<Consumer>(consumerPromise));
+    Future<Result, Consumer> consumerFuture = consumerPromise.getFuture();
+    result = consumerFuture.get(consumer);
+    ASSERT_EQ(ResultOk, result);
+
+    // Send Asynchronously of half the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO("sending half of messages in async, should timeout to receive");
+
+    // message not reached max batch number, should not receive any data.
+    Message receivedMsg;
+    ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 2000));
+
+    // After flush, it should get the message
+    producer.flush();
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 2000));
+
+    // receive all the messages.
+    while (consumer.receive(receivedMsg, 2000) == ResultOk) {
+        ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg));
+    }
+
+    // Send Asynchronously of another round of the messages
+    for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix));
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+    LOG_INFO(
+        "sending the other half messages in async, should still timeout, since 
first half already flushed");
+    ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 2000));
+
+    // After flush async, it should get the message
+    Promise<bool, Result> promise;
+    producer.flushAsync(WaitForCallback(promise));
+    Promise<bool, Result> promise1;
+    producer.flushAsync(WaitForCallback(promise1));
+    promise.getFuture().get(result);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 2000));
+
+    producer.close();
+    client.shutdown();
+}
+
+TEST(BasicEndToEndTest, testFlushInPartitionedProducer) {
+    Client client(lookupUrl);
+    std::string topicName = 
"persistent://prop/unit/ns/partition-testFlushInPartitionedProducer";
+    // call admin api to make it partitioned
+    std::string url =
+        adminUrl + 
"admin/persistent/prop/unit/ns/partition-testFlushInPartitionedProducer/partitions";
+    int res = makePutRequest(url, "5");
+    int numberOfPartitions = 5;
+
+    LOG_INFO("res = " << res);
+    ASSERT_FALSE(res != 204 && res != 409);
+
+    Producer producer;
+    int numOfMessages = 10;
+    ProducerConfiguration tempProducerConfiguration;
+    
tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution);
+    ProducerConfiguration producerConfiguration = tempProducerConfiguration;
+    producerConfiguration.setBatchingEnabled(true);
+    // set batch message number numOfMessages, and max delay 60s
+    producerConfiguration.setBatchingMaxMessages(numOfMessages / 
numberOfPartitions);
+    producerConfiguration.setBatchingMaxPublishDelayMs(60000);
+
+    Result result = client.createProducer(topicName, producerConfiguration, 
producer);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(producer.getTopic(), topicName);
+
+    LOG_INFO("Creating Subscriber");
+    std::string consumerId = "CONSUMER";
+    ConsumerConfiguration consConfig;
+    consConfig.setConsumerType(ConsumerExclusive);
+    consConfig.setReceiverQueueSize(2);
+    ASSERT_FALSE(consConfig.hasMessageListener());
+    Consumer consumer[numberOfPartitions];
+    Result subscribeResult;
+    for (int i = 0; i < numberOfPartitions; i++) {
+        std::stringstream partitionedTopicName;
+        partitionedTopicName << topicName << "-partition-" << i;
+
+        std::stringstream partitionedConsumerId;
+        partitionedConsumerId << consumerId << i;
+        subscribeResult = client.subscribe(partitionedTopicName.str(), 
partitionedConsumerId.str(),
+                                           consConfig, consumer[i]);
+
+        ASSERT_EQ(ResultOk, subscribeResult);
+        ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str());
+    }
+
+    // Send asynchronously of first part the messages
+    std::string prefix = "msg-batch-async";
+    for (int i = 0; i < numOfMessages / 2; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+
+    LOG_INFO("sending first part messages in async, should timeout to 
receive");
+    Message m;
+    ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 2000));
+
+    // After flush, should be able to consume.
+    producer.flush();
+    LOG_INFO("After flush, should be able to receive");
+    ASSERT_EQ(ResultOk, consumer[0].receive(m, 2000));
+
+    LOG_INFO("Receive all messages.");
+    // receive all the messages.
+    for (int partitionIndex = 0; partitionIndex < numberOfPartitions; 
partitionIndex++) {
+        while (consumer[partitionIndex].receive(m, 2000) == ResultOk) {
+            // ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+            ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m));
+        }
+    }
+
+    // send message again.
+    for (int i = numOfMessages / 2; i < numOfMessages; i++) {
+        std::string messageContent = prefix + 
boost::lexical_cast<std::string>(i);
+        Message msg = MessageBuilder()
+                          .setContent(messageContent)
+                          .setProperty("msgIndex", 
boost::lexical_cast<std::string>(i))
+                          .build();
+        producer.sendAsync(msg, simpleCallback);
+        LOG_DEBUG("async sending message " << messageContent);
+    }
+
+    // After flush async, it should get the message
+    Promise<bool, Result> promise;
+    producer.flushAsync(WaitForCallback(promise));
+    Promise<bool, Result> promise1;
+    producer.flushAsync(WaitForCallback(promise1));
+    promise.getFuture().get(result);
+    ASSERT_EQ(ResultOk, result);
+    ASSERT_EQ(ResultOk, consumer[0].receive(m, 2000));
+
+    producer.close();
+    client.shutdown();
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to