This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new b982b5d add producer flush method in cpp client (#3020) b982b5d is described below commit b982b5dc9b631b28b017c257174211e7058361ab Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Tue Nov 20 16:11:06 2018 +0800 add producer flush method in cpp client (#3020) ### Motivation We already have flush() method in java api: http://pulsar.apache.org/api/client/org/apache/pulsar/client/api/Producer.html#flush-- It is better to provide a same method for cpp client. ### Modifications Add related methods in cpp client. Add related tests. ### Result Cpp unit tests pass. --- pulsar-client-cpp/include/pulsar/Producer.h | 15 ++ pulsar-client-cpp/lib/BatchMessageContainer.cc | 22 ++- pulsar-client-cpp/lib/BatchMessageContainer.h | 4 +- pulsar-client-cpp/lib/PartitionedProducerImpl.cc | 33 ++++ pulsar-client-cpp/lib/PartitionedProducerImpl.h | 5 + pulsar-client-cpp/lib/Producer.cc | 18 +++ pulsar-client-cpp/lib/ProducerImpl.cc | 41 ++++- pulsar-client-cpp/lib/ProducerImpl.h | 4 + pulsar-client-cpp/lib/ProducerImplBase.h | 1 + pulsar-client-cpp/tests/BasicEndToEndTest.cc | 189 +++++++++++++++++++++++ 10 files changed, 320 insertions(+), 12 deletions(-) diff --git a/pulsar-client-cpp/include/pulsar/Producer.h b/pulsar-client-cpp/include/pulsar/Producer.h index 1955c7f..407d937 100644 --- a/pulsar-client-cpp/include/pulsar/Producer.h +++ b/pulsar-client-cpp/include/pulsar/Producer.h @@ -29,6 +29,9 @@ namespace pulsar { class ProducerImplBase; class PulsarWrapper; class PulsarFriend; + +typedef boost::function<void(Result)> FlushCallback; + class Producer { public: /** @@ -81,6 +84,18 @@ class Producer { void sendAsync(const Message& msg, SendCallback callback); /** + * Flush all the messages buffered in the client and wait until all messages have been successfully + * persisted. + */ + Result flush(); + + /** + * Flush all the messages buffered in the client and wait until all messages have been successfully + * persisted. + */ + void flushAsync(FlushCallback callback); + + /** * Get the last sequence id that was published by this producer. * * This represent either the automatically assigned or custom sequence id (set on the MessageBuilder) that diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.cc b/pulsar-client-cpp/lib/BatchMessageContainer.cc index 0986887..ec8b489 100644 --- a/pulsar-client-cpp/lib/BatchMessageContainer.cc +++ b/pulsar-client-cpp/lib/BatchMessageContainer.cc @@ -47,7 +47,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b << "]"); if (!(disableCheck || hasSpaceInBatch(msg))) { LOG_DEBUG(*this << " Batch is full"); - sendMessage(); + sendMessage(NULL); add(msg, sendCallback, true); return; } @@ -71,7 +71,7 @@ void BatchMessageContainer::add(const Message& msg, SendCallback sendCallback, b LOG_DEBUG(*this << " Batch Payload Size In Bytes = " << batchSizeInBytes_); if (isFull()) { LOG_DEBUG(*this << " Batch is full."); - sendMessage(); + sendMessage(NULL); } } @@ -83,11 +83,14 @@ void BatchMessageContainer::startTimer() { boost::asio::placeholders::error)); } -void BatchMessageContainer::sendMessage() { +void BatchMessageContainer::sendMessage(FlushCallback flushCallback) { // Call this function after acquiring the ProducerImpl lock LOG_DEBUG(*this << "Sending the batch message container"); if (isEmpty()) { LOG_DEBUG(*this << " Batch is empty - returning."); + if (flushCallback) { + flushCallback(ResultOk); + } return; } impl_->metadata.set_num_messages_in_batch(messagesContainerListPtr_->size()); @@ -101,8 +104,8 @@ void BatchMessageContainer::sendMessage() { msg.impl_ = impl_; // bind keeps a copy of the parameters - SendCallback callback = - boost::bind(&BatchMessageContainer::batchMessageCallBack, _1, messagesContainerListPtr_); + SendCallback callback = boost::bind(&BatchMessageContainer::batchMessageCallBack, _1, + messagesContainerListPtr_, flushCallback); producer_.sendMessage(msg, callback); clear(); @@ -131,8 +134,12 @@ void BatchMessageContainer::clear() { batchSizeInBytes_ = 0; } -void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListPtr messagesContainerListPtr) { +void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListPtr messagesContainerListPtr, + FlushCallback flushCallback) { if (!messagesContainerListPtr) { + if (flushCallback) { + flushCallback(ResultOk); + } return; } LOG_DEBUG("BatchMessageContainer::batchMessageCallBack called with [Result = " @@ -142,6 +149,9 @@ void BatchMessageContainer::batchMessageCallBack(Result r, MessageContainerListP // callback(result, message) iter->sendCallback_(r, iter->message_); } + if (flushCallback) { + flushCallback(ResultOk); + } } BatchMessageContainer::~BatchMessageContainer() { diff --git a/pulsar-client-cpp/lib/BatchMessageContainer.h b/pulsar-client-cpp/lib/BatchMessageContainer.h index 0b7a0d3..37b52ef 100644 --- a/pulsar-client-cpp/lib/BatchMessageContainer.h +++ b/pulsar-client-cpp/lib/BatchMessageContainer.h @@ -64,7 +64,7 @@ class BatchMessageContainer { void clear(); - static void batchMessageCallBack(Result r, MessageContainerListPtr messages); + static void batchMessageCallBack(Result r, MessageContainerListPtr messages, FlushCallback callback); friend inline std::ostream& operator<<(std::ostream& os, const BatchMessageContainer& batchMessageContainer); @@ -108,7 +108,7 @@ class BatchMessageContainer { void startTimer(); - void sendMessage(); + void sendMessage(FlushCallback callback); }; bool BatchMessageContainer::hasSpaceInBatch(const Message& msg) const { diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc index 4e75e0e..0f2780e 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.cc +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.cc @@ -241,4 +241,37 @@ void PartitionedProducerImpl::triggerFlush() { } } +void PartitionedProducerImpl::flushAsync(FlushCallback callback) { + if (!flushPromise_ || flushPromise_->isComplete()) { + flushPromise_ = boost::make_shared<Promise<Result, bool_type>>(); + } else { + // already in flushing, register a listener callback + boost::function<void(Result, bool)> listenerCallback = [this, callback](Result result, bool_type v) { + if (v) { + callback(ResultOk); + } else { + callback(ResultUnknownError); + } + return; + }; + + flushPromise_->getFuture().addListener(listenerCallback); + return; + } + + FlushCallback subFlushCallback = [this, callback](Result result) { + int previous = flushedPartitions_.fetch_add(1); + if (previous == producers_.size() - 1) { + flushedPartitions_.store(0); + flushPromise_->setValue(true); + callback(result); + } + return; + }; + + for (ProducerList::const_iterator prod = producers_.begin(); prod != producers_.end(); prod++) { + (*prod)->flushAsync(subFlushCallback); + } +} + } // namespace pulsar diff --git a/pulsar-client-cpp/lib/PartitionedProducerImpl.h b/pulsar-client-cpp/lib/PartitionedProducerImpl.h index cf6ddd9..0a3d949 100644 --- a/pulsar-client-cpp/lib/PartitionedProducerImpl.h +++ b/pulsar-client-cpp/lib/PartitionedProducerImpl.h @@ -71,6 +71,8 @@ class PartitionedProducerImpl : public ProducerImplBase, virtual void triggerFlush(); + virtual void flushAsync(FlushCallback callback); + void handleSinglePartitionProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr, const unsigned int partitionIndex); @@ -115,6 +117,9 @@ class PartitionedProducerImpl : public ProducerImplBase, Promise<Result, ProducerImplBaseWeakPtr> partitionedProducerCreatedPromise_; MessageRoutingPolicyPtr getMessageRouter(); + + std::atomic<int> flushedPartitions_; + boost::shared_ptr<Promise<Result, bool_type>> flushPromise_; }; } // namespace pulsar diff --git a/pulsar-client-cpp/lib/Producer.cc b/pulsar-client-cpp/lib/Producer.cc index 49adbfc..bee213e 100644 --- a/pulsar-client-cpp/lib/Producer.cc +++ b/pulsar-client-cpp/lib/Producer.cc @@ -77,4 +77,22 @@ void Producer::closeAsync(CloseCallback callback) { impl_->closeAsync(callback); } + +Result Producer::flush() { + Promise<bool, Result> promise; + flushAsync(WaitForCallback(promise)); + + Result result; + promise.getFuture().get(result); + return result; +} + +void Producer::flushAsync(FlushCallback callback) { + if (!impl_) { + callback(ResultProducerNotInitialized); + return; + } + + impl_->flushAsync(callback); +} } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ProducerImpl.cc b/pulsar-client-cpp/lib/ProducerImpl.cc index ddcf671..32f178c 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.cc +++ b/pulsar-client-cpp/lib/ProducerImpl.cc @@ -245,7 +245,7 @@ void ProducerImpl::failPendingMessages(Result result) { } // this function can handle null pointer - BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr); + BatchMessageContainer::batchMessageCallBack(ResultTimeout, messageContainerListPtr, NULL); } void ProducerImpl::resendMessages(ClientConnectionPtr cnx) { @@ -285,10 +285,43 @@ void ProducerImpl::statsCallBackHandler(Result res, const Message& msg, SendCall } } +void ProducerImpl::flushAsync(FlushCallback callback) { + if (batchMessageContainer) { + if (!flushPromise_ || flushPromise_->isComplete()) { + flushPromise_ = boost::make_shared<Promise<Result, bool_type>>(); + } else { + // already in flushing, register a listener callback + boost::function<void(Result, bool)> listenerCallback = [this, callback](Result result, + bool_type v) { + if (v) { + callback(ResultOk); + } else { + callback(ResultUnknownError); + } + return; + }; + + flushPromise_->getFuture().addListener(listenerCallback); + return; + } + + FlushCallback innerCallback = [this, callback](Result result) { + flushPromise_->setValue(true); + callback(result); + return; + }; + + Lock lock(mutex_); + batchMessageContainer->sendMessage(innerCallback); + } else { + callback(ResultOk); + } +} + void ProducerImpl::triggerFlush() { if (batchMessageContainer) { Lock lock(mutex_); - batchMessageContainer->sendMessage(); + batchMessageContainer->sendMessage(NULL); } } @@ -364,7 +397,7 @@ void ProducerImpl::sendAsync(const Message& msg, SendCallback callback) { // If queue is full sending the batch immediately, no point waiting till batchMessagetimeout if (batchMessageContainer) { LOG_DEBUG(getName() << " - sending batch message immediately"); - batchMessageContainer->sendMessage(); + batchMessageContainer->sendMessage(NULL); } lock.unlock(); cb(ResultProducerQueueIsFull, msg); @@ -412,7 +445,7 @@ void ProducerImpl::batchMessageTimeoutHandler(const boost::system::error_code& e } LOG_DEBUG(getName() << " - Batch Message Timer expired"); Lock lock(mutex_); - batchMessageContainer->sendMessage(); + batchMessageContainer->sendMessage(NULL); } void ProducerImpl::printStats() { diff --git a/pulsar-client-cpp/lib/ProducerImpl.h b/pulsar-client-cpp/lib/ProducerImpl.h index 615eafb..aa22abc 100644 --- a/pulsar-client-cpp/lib/ProducerImpl.h +++ b/pulsar-client-cpp/lib/ProducerImpl.h @@ -34,6 +34,7 @@ using namespace pulsar; namespace pulsar { +typedef bool bool_type; class BatchMessageContainer; @@ -90,6 +91,8 @@ class ProducerImpl : public HandlerBase, virtual void triggerFlush(); + virtual void flushAsync(FlushCallback callback); + protected: ProducerStatsBasePtr producerStatsBasePtr_; @@ -156,6 +159,7 @@ class ProducerImpl : public HandlerBase, MessageCryptoPtr msgCrypto_; DeadlineTimerPtr dataKeyGenTImer_; uint32_t dataKeyGenIntervalSec_; + boost::shared_ptr<Promise<Result, bool_type>> flushPromise_; }; struct ProducerImplCmp { diff --git a/pulsar-client-cpp/lib/ProducerImplBase.h b/pulsar-client-cpp/lib/ProducerImplBase.h index 92f886f..3dd92a4 100644 --- a/pulsar-client-cpp/lib/ProducerImplBase.h +++ b/pulsar-client-cpp/lib/ProducerImplBase.h @@ -43,6 +43,7 @@ class ProducerImplBase { virtual const std::string& getTopic() const = 0; virtual Future<Result, ProducerImplBaseWeakPtr> getProducerCreatedFuture() = 0; virtual void triggerFlush() = 0; + virtual void flushAsync(FlushCallback callback) = 0; }; } // namespace pulsar #endif // PULSAR_PRODUCER_IMPL_BASE_HEADER diff --git a/pulsar-client-cpp/tests/BasicEndToEndTest.cc b/pulsar-client-cpp/tests/BasicEndToEndTest.cc index d264ab1..c5f4420 100644 --- a/pulsar-client-cpp/tests/BasicEndToEndTest.cc +++ b/pulsar-client-cpp/tests/BasicEndToEndTest.cc @@ -2190,3 +2190,192 @@ TEST(BasicEndToEndTest, testGetTopicPartitions) { client.shutdown(); } + +TEST(BasicEndToEndTest, testFlushInProducer) { + ClientConfiguration config; + Client client(lookupUrl); + std::string topicName = "persistent://property/cluster/namespace/test-flush-in-producer"; + std::string subName = "subscription-name"; + Producer producer; + int numOfMessages = 10; + + ProducerConfiguration conf; + conf.setBatchingEnabled(true); + // set batch message number numOfMessages, and max delay 60s + conf.setBatchingMaxMessages(numOfMessages); + conf.setBatchingMaxPublishDelayMs(60000); + + conf.setBlockIfQueueFull(true); + conf.setProperty("producer-name", "test-producer-name"); + conf.setProperty("producer-id", "test-producer-id"); + + Promise<Result, Producer> producerPromise; + client.createProducerAsync(topicName, conf, WaitForCallbackValue<Producer>(producerPromise)); + Future<Result, Producer> producerFuture = producerPromise.getFuture(); + Result result = producerFuture.get(producer); + ASSERT_EQ(ResultOk, result); + + Consumer consumer; + ConsumerConfiguration consumerConfig; + consumerConfig.setProperty("consumer-name", "test-consumer-name"); + consumerConfig.setProperty("consumer-id", "test-consumer-id"); + Promise<Result, Consumer> consumerPromise; + client.subscribeAsync(topicName, subName, consumerConfig, + WaitForCallbackValue<Consumer>(consumerPromise)); + Future<Result, Consumer> consumerFuture = consumerPromise.getFuture(); + result = consumerFuture.get(consumer); + ASSERT_EQ(ResultOk, result); + + // Send Asynchronously of half the messages + std::string prefix = "msg-batch-async"; + for (int i = 0; i < numOfMessages / 2; i++) { + std::string messageContent = prefix + boost::lexical_cast<std::string>(i); + Message msg = MessageBuilder() + .setContent(messageContent) + .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) + .build(); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + LOG_DEBUG("async sending message " << messageContent); + } + LOG_INFO("sending half of messages in async, should timeout to receive"); + + // message not reached max batch number, should not receive any data. + Message receivedMsg; + ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 2000)); + + // After flush, it should get the message + producer.flush(); + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 2000)); + + // receive all the messages. + while (consumer.receive(receivedMsg, 2000) == ResultOk) { + ASSERT_EQ(ResultOk, consumer.acknowledge(receivedMsg)); + } + + // Send Asynchronously of another round of the messages + for (int i = numOfMessages / 2; i < numOfMessages; i++) { + std::string messageContent = prefix + boost::lexical_cast<std::string>(i); + Message msg = MessageBuilder() + .setContent(messageContent) + .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) + .build(); + producer.sendAsync(msg, boost::bind(&sendCallBack, _1, _2, prefix)); + LOG_DEBUG("async sending message " << messageContent); + } + LOG_INFO( + "sending the other half messages in async, should still timeout, since first half already flushed"); + ASSERT_EQ(ResultTimeout, consumer.receive(receivedMsg, 2000)); + + // After flush async, it should get the message + Promise<bool, Result> promise; + producer.flushAsync(WaitForCallback(promise)); + Promise<bool, Result> promise1; + producer.flushAsync(WaitForCallback(promise1)); + promise.getFuture().get(result); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(ResultOk, consumer.receive(receivedMsg, 2000)); + + producer.close(); + client.shutdown(); +} + +TEST(BasicEndToEndTest, testFlushInPartitionedProducer) { + Client client(lookupUrl); + std::string topicName = "persistent://prop/unit/ns/partition-testFlushInPartitionedProducer"; + // call admin api to make it partitioned + std::string url = + adminUrl + "admin/persistent/prop/unit/ns/partition-testFlushInPartitionedProducer/partitions"; + int res = makePutRequest(url, "5"); + int numberOfPartitions = 5; + + LOG_INFO("res = " << res); + ASSERT_FALSE(res != 204 && res != 409); + + Producer producer; + int numOfMessages = 10; + ProducerConfiguration tempProducerConfiguration; + tempProducerConfiguration.setPartitionsRoutingMode(ProducerConfiguration::RoundRobinDistribution); + ProducerConfiguration producerConfiguration = tempProducerConfiguration; + producerConfiguration.setBatchingEnabled(true); + // set batch message number numOfMessages, and max delay 60s + producerConfiguration.setBatchingMaxMessages(numOfMessages / numberOfPartitions); + producerConfiguration.setBatchingMaxPublishDelayMs(60000); + + Result result = client.createProducer(topicName, producerConfiguration, producer); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(producer.getTopic(), topicName); + + LOG_INFO("Creating Subscriber"); + std::string consumerId = "CONSUMER"; + ConsumerConfiguration consConfig; + consConfig.setConsumerType(ConsumerExclusive); + consConfig.setReceiverQueueSize(2); + ASSERT_FALSE(consConfig.hasMessageListener()); + Consumer consumer[numberOfPartitions]; + Result subscribeResult; + for (int i = 0; i < numberOfPartitions; i++) { + std::stringstream partitionedTopicName; + partitionedTopicName << topicName << "-partition-" << i; + + std::stringstream partitionedConsumerId; + partitionedConsumerId << consumerId << i; + subscribeResult = client.subscribe(partitionedTopicName.str(), partitionedConsumerId.str(), + consConfig, consumer[i]); + + ASSERT_EQ(ResultOk, subscribeResult); + ASSERT_EQ(consumer[i].getTopic(), partitionedTopicName.str()); + } + + // Send asynchronously of first part the messages + std::string prefix = "msg-batch-async"; + for (int i = 0; i < numOfMessages / 2; i++) { + std::string messageContent = prefix + boost::lexical_cast<std::string>(i); + Message msg = MessageBuilder() + .setContent(messageContent) + .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) + .build(); + producer.sendAsync(msg, simpleCallback); + LOG_DEBUG("async sending message " << messageContent); + } + + LOG_INFO("sending first part messages in async, should timeout to receive"); + Message m; + ASSERT_EQ(ResultTimeout, consumer[0].receive(m, 2000)); + + // After flush, should be able to consume. + producer.flush(); + LOG_INFO("After flush, should be able to receive"); + ASSERT_EQ(ResultOk, consumer[0].receive(m, 2000)); + + LOG_INFO("Receive all messages."); + // receive all the messages. + for (int partitionIndex = 0; partitionIndex < numberOfPartitions; partitionIndex++) { + while (consumer[partitionIndex].receive(m, 2000) == ResultOk) { + // ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m)); + ASSERT_EQ(ResultOk, consumer[partitionIndex].acknowledge(m)); + } + } + + // send message again. + for (int i = numOfMessages / 2; i < numOfMessages; i++) { + std::string messageContent = prefix + boost::lexical_cast<std::string>(i); + Message msg = MessageBuilder() + .setContent(messageContent) + .setProperty("msgIndex", boost::lexical_cast<std::string>(i)) + .build(); + producer.sendAsync(msg, simpleCallback); + LOG_DEBUG("async sending message " << messageContent); + } + + // After flush async, it should get the message + Promise<bool, Result> promise; + producer.flushAsync(WaitForCallback(promise)); + Promise<bool, Result> promise1; + producer.flushAsync(WaitForCallback(promise1)); + promise.getFuture().get(result); + ASSERT_EQ(ResultOk, result); + ASSERT_EQ(ResultOk, consumer[0].receive(m, 2000)); + + producer.close(); + client.shutdown(); +}