This is an automated email from the ASF dual-hosted git repository.
zike pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 74ef1a0 [feat] Support expiration for chunked messages (#71)
74ef1a0 is described below
commit 74ef1a01f5c7a4604d251de6d040c433f9bbf56b
Author: Zike Yang <[email protected]>
AuthorDate: Mon Nov 7 12:09:22 2022 +0800
[feat] Support expiration for chunked messages (#71)
### Motivation
Add support for checking expiration for incomplete chunked messages.
### Modifications
* Add configuration `expireTimeOfIncompleteChunkedMessageMs` to the
consumer.
* Add timer to check the expiration incomplete chunked messages
Co-authored-by: Yunze Xu <[email protected]>
---
include/pulsar/ConsumerConfiguration.h | 20 +++++++++++++
include/pulsar/Message.h | 1 +
lib/ConsumerConfiguration.cc | 10 +++++++
lib/ConsumerConfigurationImpl.h | 1 +
lib/ConsumerImpl.cc | 53 +++++++++++++++++++++++++++++++++-
lib/ConsumerImpl.h | 11 +++++++
lib/MapCache.h | 17 +++++++++++
tests/MapCacheTest.cc | 30 +++++++++++++++++++
tests/MessageChunkingTest.cc | 48 ++++++++++++++++++++++++++++++
tests/PulsarFriend.h | 7 +++++
tests/WaitUtils.h | 5 ++--
11 files changed, 200 insertions(+), 3 deletions(-)
diff --git a/include/pulsar/ConsumerConfiguration.h
b/include/pulsar/ConsumerConfiguration.h
index 0418cfa..520901c 100644
--- a/include/pulsar/ConsumerConfiguration.h
+++ b/include/pulsar/ConsumerConfiguration.h
@@ -519,6 +519,26 @@ class PULSAR_PUBLIC ConsumerConfiguration {
*/
bool isAutoAckOldestChunkedMessageOnQueueFull() const;
+ /**
+ * If producer fails to publish all the chunks of a message then consumer
can expire incomplete chunks if
+ * consumer won't be able to receive all chunks in expire times. Use value
0 to disable this feature.
+ *
+ * Default: 60000, which means 1 minutes
+ *
+ * @param expireTimeOfIncompleteChunkedMessageMs expire time in
milliseconds
+ * @return Consumer Configuration
+ */
+ ConsumerConfiguration& setExpireTimeOfIncompleteChunkedMessageMs(
+ long expireTimeOfIncompleteChunkedMessageMs);
+
+ /**
+ *
+ * Get the expire time of incomplete chunked message in milliseconds
+ *
+ * @return the expire time of incomplete chunked message in milliseconds
+ */
+ long getExpireTimeOfIncompleteChunkedMessageMs() const;
+
/**
* Set the consumer to include the given position of any reset operation
like Consumer::seek.
*
diff --git a/include/pulsar/Message.h b/include/pulsar/Message.h
index 74427a2..a778660 100644
--- a/include/pulsar/Message.h
+++ b/include/pulsar/Message.h
@@ -200,6 +200,7 @@ class PULSAR_PUBLIC Message {
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const
StringMap& map);
friend PULSAR_PUBLIC std::ostream& operator<<(std::ostream& s, const
Message& msg);
+ friend class PulsarFriend;
};
} // namespace pulsar
diff --git a/lib/ConsumerConfiguration.cc b/lib/ConsumerConfiguration.cc
index 6298245..f37e042 100644
--- a/lib/ConsumerConfiguration.cc
+++ b/lib/ConsumerConfiguration.cc
@@ -262,6 +262,16 @@ bool
ConsumerConfiguration::isAutoAckOldestChunkedMessageOnQueueFull() const {
return impl_->autoAckOldestChunkedMessageOnQueueFull;
}
+ConsumerConfiguration&
ConsumerConfiguration::setExpireTimeOfIncompleteChunkedMessageMs(
+ long expireTimeOfIncompleteChunkedMessageMs) {
+ impl_->expireTimeOfIncompleteChunkedMessageMs =
expireTimeOfIncompleteChunkedMessageMs;
+ return *this;
+}
+
+long ConsumerConfiguration::getExpireTimeOfIncompleteChunkedMessageMs() const {
+ return impl_->expireTimeOfIncompleteChunkedMessageMs;
+}
+
ConsumerConfiguration& ConsumerConfiguration::setStartMessageIdInclusive(bool
startMessageIdInclusive) {
impl_->startMessageIdInclusive = startMessageIdInclusive;
return *this;
diff --git a/lib/ConsumerConfigurationImpl.h b/lib/ConsumerConfigurationImpl.h
index 444fedf..259b935 100644
--- a/lib/ConsumerConfigurationImpl.h
+++ b/lib/ConsumerConfigurationImpl.h
@@ -55,6 +55,7 @@ struct ConsumerConfigurationImpl {
size_t maxPendingChunkedMessage{10};
bool autoAckOldestChunkedMessageOnQueueFull{false};
bool startMessageIdInclusive{false};
+ long expireTimeOfIncompleteChunkedMessageMs{60000};
};
} // namespace pulsar
#endif /* LIB_CONSUMERCONFIGURATIONIMPL_H_ */
diff --git a/lib/ConsumerImpl.cc b/lib/ConsumerImpl.cc
index 5698b46..0966729 100644
--- a/lib/ConsumerImpl.cc
+++ b/lib/ConsumerImpl.cc
@@ -78,7 +78,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client, const
std::string& topic,
readCompacted_(conf.isReadCompacted()),
startMessageId_(startMessageId),
maxPendingChunkedMessage_(conf.getMaxPendingChunkedMessage()),
-
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull())
{
+
autoAckOldestChunkedMessageOnQueueFull_(conf.isAutoAckOldestChunkedMessageOnQueueFull()),
+
expireTimeOfIncompleteChunkedMessageMs_(conf.getExpireTimeOfIncompleteChunkedMessageMs())
{
std::stringstream consumerStrStream;
consumerStrStream << "[" << topic_ << ", " << subscription_ << ", " <<
consumerId_ << "] ";
consumerStr_ = consumerStrStream.str();
@@ -109,6 +110,8 @@ ConsumerImpl::ConsumerImpl(const ClientImplPtr client,
const std::string& topic,
if (conf.isEncryptionEnabled()) {
msgCrypto_ = std::make_shared<MessageCrypto>(consumerStr_, false);
}
+
+ checkExpiredChunkedTimer_ = executor_->createDeadlineTimer();
}
ConsumerImpl::~ConsumerImpl() {
@@ -319,6 +322,45 @@ void ConsumerImpl::unsubscribeAsync(ResultCallback
originalCallback) {
}
}
+void ConsumerImpl::triggerCheckExpiredChunkedTimer() {
+ checkExpiredChunkedTimer_->expires_from_now(
+
boost::posix_time::milliseconds(expireTimeOfIncompleteChunkedMessageMs_));
+ std::weak_ptr<ConsumerImplBase> weakSelf{shared_from_this()};
+ checkExpiredChunkedTimer_->async_wait([this, weakSelf](const
boost::system::error_code& ec) -> void {
+ auto self = weakSelf.lock();
+ if (!self) {
+ return;
+ }
+ if (ec) {
+ LOG_DEBUG(getName() << " Check expired chunked messages was failed
or cancelled, code[" << ec
+ << "].");
+ return;
+ }
+ Lock lock(chunkProcessMutex_);
+ long currentTimeMs = TimeUtils::currentTimeMillis();
+ chunkedMessageCache_.removeOldestValuesIf(
+ [this, currentTimeMs](const std::string& uuid, const
ChunkedMessageCtx& ctx) -> bool {
+ bool expired =
+ currentTimeMs > ctx.getReceivedTimeMs() +
expireTimeOfIncompleteChunkedMessageMs_;
+ if (!expired) {
+ return false;
+ }
+ for (const MessageId& msgId : ctx.getChunkedMessageIds()) {
+ LOG_INFO("Removing expired chunk messages: uuid: " << uuid
<< ", messageId: " << msgId);
+ doAcknowledgeIndividual(msgId, [uuid, msgId](Result
result) {
+ if (result != ResultOk) {
+ LOG_WARN("Failed to acknowledge discarded chunk,
uuid: "
+ << uuid << ", messageId: " << msgId);
+ }
+ });
+ }
+ return true;
+ });
+ triggerCheckExpiredChunkedTimer();
+ return;
+ });
+}
+
Optional<SharedBuffer> ConsumerImpl::processMessageChunk(const SharedBuffer&
payload,
const
proto::MessageMetadata& metadata,
const MessageId&
messageId,
@@ -331,6 +373,14 @@ Optional<SharedBuffer>
ConsumerImpl::processMessageChunk(const SharedBuffer& pay
<< payload.readableBytes() <<
" bytes");
Lock lock(chunkProcessMutex_);
+
+ // Lazy task scheduling to expire incomplete chunk message
+ bool expected = false;
+ if (expireTimeOfIncompleteChunkedMessageMs_ > 0 &&
+ expireChunkMessageTaskScheduled_.compare_exchange_strong(expected,
true)) {
+ triggerCheckExpiredChunkedTimer();
+ }
+
auto it = chunkedMessageCache_.find(uuid);
if (chunkId == 0) {
@@ -1448,6 +1498,7 @@ std::shared_ptr<ConsumerImpl>
ConsumerImpl::get_shared_this_ptr() {
void ConsumerImpl::cancelTimers() noexcept {
boost::system::error_code ec;
batchReceiveTimer_->cancel(ec);
+ checkExpiredChunkedTimer_->cancel(ec);
}
} /* namespace pulsar */
diff --git a/lib/ConsumerImpl.h b/lib/ConsumerImpl.h
index 693180f..9ba6577 100644
--- a/lib/ConsumerImpl.h
+++ b/lib/ConsumerImpl.h
@@ -33,6 +33,7 @@
#include "NegativeAcksTracker.h"
#include "Synchronized.h"
#include "TestUtil.h"
+#include "TimeUtils.h"
#include "UnboundedBlockingQueue.h"
namespace pulsar {
@@ -259,6 +260,7 @@ class ConsumerImpl : public ConsumerImplBase {
void appendChunk(const MessageId& messageId, const SharedBuffer&
payload) {
chunkedMessageIds_.emplace_back(messageId);
chunkedMsgBuffer_.write(payload.data(), payload.readableBytes());
+ receivedTimeMs_ = TimeUtils::currentTimeMillis();
}
bool isCompleted() const noexcept { return totalChunks_ ==
numChunks(); }
@@ -267,6 +269,8 @@ class ConsumerImpl : public ConsumerImplBase {
const std::vector<MessageId>& getChunkedMessageIds() const noexcept {
return chunkedMessageIds_; }
+ long getReceivedTimeMs() const noexcept { return receivedTimeMs_; }
+
friend std::ostream& operator<<(std::ostream& os, const
ChunkedMessageCtx& ctx) {
return os << "ChunkedMessageCtx " <<
ctx.chunkedMsgBuffer_.readableBytes() << " of "
<< ctx.chunkedMsgBuffer_.writerIndex() << " bytes, " <<
ctx.numChunks() << " of "
@@ -277,6 +281,7 @@ class ConsumerImpl : public ConsumerImplBase {
const int totalChunks_;
SharedBuffer chunkedMsgBuffer_;
std::vector<MessageId> chunkedMessageIds_;
+ long receivedTimeMs_;
int numChunks() const noexcept { return
static_cast<int>(chunkedMessageIds_.size()); }
};
@@ -297,6 +302,12 @@ class ConsumerImpl : public ConsumerImplBase {
MapCache<std::string, ChunkedMessageCtx> chunkedMessageCache_;
mutable std::mutex chunkProcessMutex_;
+ const long expireTimeOfIncompleteChunkedMessageMs_;
+ DeadlineTimerPtr checkExpiredChunkedTimer_;
+ std::atomic_bool expireChunkMessageTaskScheduled_{false};
+
+ void triggerCheckExpiredChunkedTimer();
+
/**
* Process a chunk. If the chunk is the last chunk of a message,
concatenate all buffered chunks into the
* payload and return it.
diff --git a/lib/MapCache.h b/lib/MapCache.h
index b9a0069..55d58f6 100644
--- a/lib/MapCache.h
+++ b/lib/MapCache.h
@@ -73,6 +73,23 @@ class MapCache {
}
}
+ void removeOldestValuesIf(const std::function<bool(const Key&, const
Value&)>& condition) {
+ if (!condition) return;
+ while (!keys_.empty()) {
+ const auto key = keys_.front();
+ auto it = map_.find(key);
+ if (it == map_.end()) {
+ continue;
+ }
+ if (condition(it->first, it->second)) {
+ map_.erase(it);
+ keys_.pop_front();
+ } else {
+ return;
+ }
+ }
+ }
+
void remove(const Key& key) {
auto it = map_.find(key);
if (it != map_.end()) {
diff --git a/tests/MapCacheTest.cc b/tests/MapCacheTest.cc
index 2140937..69496c4 100644
--- a/tests/MapCacheTest.cc
+++ b/tests/MapCacheTest.cc
@@ -77,3 +77,33 @@ TEST(MapCacheTest, testRemoveAllValues) {
ASSERT_TRUE(cache.getKeys().empty());
ASSERT_EQ(cache.size(), 0);
}
+
+TEST(MapCacheTest, testRemoveOldestValuesIf) {
+ MapCache<int, MoveOnlyInt> cache;
+ cache.putIfAbsent(1, {100});
+ cache.putIfAbsent(2, {200});
+ cache.putIfAbsent(3, {300});
+ int expireTime = 100;
+
+ auto checkCondition = [&expireTime](const int& key, const MoveOnlyInt&
value) -> bool {
+ return expireTime > value.x;
+ };
+
+ cache.removeOldestValuesIf(nullptr);
+ ASSERT_EQ(cache.size(), 3);
+
+ cache.removeOldestValuesIf(checkCondition);
+ ASSERT_EQ(cache.size(), 3);
+
+ expireTime = 200;
+ cache.removeOldestValuesIf(checkCondition);
+
+ auto keys = cache.getKeys();
+ ASSERT_EQ(cache.size(), 2);
+ ASSERT_EQ(cache.find(2)->second.x, 200);
+ ASSERT_EQ(cache.find(3)->second.x, 300);
+
+ expireTime = 400;
+ cache.removeOldestValuesIf(checkCondition);
+ ASSERT_EQ(cache.size(), 0);
+}
diff --git a/tests/MessageChunkingTest.cc b/tests/MessageChunkingTest.cc
index 61a9714..8675886 100644
--- a/tests/MessageChunkingTest.cc
+++ b/tests/MessageChunkingTest.cc
@@ -23,6 +23,7 @@
#include <random>
#include "PulsarFriend.h"
+#include "WaitUtils.h"
#include "lib/LogUtils.h"
DECLARE_LOG_OBJECT()
@@ -81,6 +82,10 @@ class MessageChunkingTest : public
::testing::TestWithParam<CompressionType> {
ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", consumer));
}
+ void createConsumer(const std::string& topic, Consumer& consumer,
ConsumerConfiguration& conf) {
+ ASSERT_EQ(ResultOk, client_.subscribe(topic, "my-sub", conf,
consumer));
+ }
+
private:
Client client_{lookupUrl};
};
@@ -130,6 +135,49 @@ TEST_P(MessageChunkingTest, testEndToEnd) {
// Verify the cache has been cleared
auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
ASSERT_EQ(chunkedMessageCache.size(), 0);
+
+ producer.close();
+ consumer.close();
+}
+
+TEST_P(MessageChunkingTest, testExpireIncompleteChunkMessage) {
+ // This test is time-consuming and is not related to the compressionType.
So skip other compressionType
+ // here.
+ if (toString(GetParam()) != "None") {
+ return;
+ }
+ const std::string topic =
"MessageChunkingTest-testExpireIncompleteChunkMessage-" + toString(GetParam()) +
+ std::to_string(time(nullptr));
+ Consumer consumer;
+ ConsumerConfiguration consumerConf;
+ consumerConf.setExpireTimeOfIncompleteChunkedMessageMs(5000);
+ consumerConf.setAutoAckOldestChunkedMessageOnQueueFull(true);
+ createConsumer(topic, consumer, consumerConf);
+ Producer producer;
+ createProducer(topic, producer);
+
+ auto msg = MessageBuilder().setContent("test-data").build();
+ auto& metadata = PulsarFriend::getMessageMetadata(msg);
+ metadata.set_num_chunks_from_msg(2);
+ metadata.set_chunk_id(0);
+ metadata.set_total_chunk_msg_size(100);
+
+ producer.send(msg);
+
+ auto& chunkedMessageCache = PulsarFriend::getChunkedMessageCache(consumer);
+
+ waitUntil(
+ std::chrono::seconds(2), [&] { return chunkedMessageCache.size() > 0;
}, 1000);
+ ASSERT_EQ(chunkedMessageCache.size(), 1);
+
+ // Wait for triggering the check of the expiration.
+ // Need to wait for 2 * expireTime because there may be a gap in checking
the expiration time.
+ waitUntil(
+ std::chrono::seconds(10), [&] { return chunkedMessageCache.size() ==
0; }, 1000);
+ ASSERT_EQ(chunkedMessageCache.size(), 0);
+
+ producer.close();
+ consumer.close();
}
// The CI env is Ubuntu 16.04, the gtest-dev version is 1.8.0 that doesn't
have INSTANTIATE_TEST_SUITE_P
diff --git a/tests/PulsarFriend.h b/tests/PulsarFriend.h
index 938b284..3272bce 100644
--- a/tests/PulsarFriend.h
+++ b/tests/PulsarFriend.h
@@ -16,12 +16,15 @@
* specific language governing permissions and limitations
* under the License.
*/
+#ifndef PULSAR_FRIEND_HPP_
+#define PULSAR_FRIEND_HPP_
#include <string>
#include "lib/ClientConnection.h"
#include "lib/ClientImpl.h"
#include "lib/ConsumerImpl.h"
+#include "lib/MessageImpl.h"
#include "lib/MultiTopicsConsumerImpl.h"
#include "lib/NamespaceName.h"
#include "lib/PartitionedProducerImpl.h"
@@ -180,5 +183,9 @@ class PulsarFriend {
static size_t getNumberOfPendingTasks(const RetryableLookupService&
lookupService) {
return lookupService.backoffTimers_.size();
}
+
+ static proto::MessageMetadata& getMessageMetadata(Message& message) {
return message.impl_->metadata; }
};
} // namespace pulsar
+
+#endif /* PULSAR_FRIEND_HPP_ */
diff --git a/tests/WaitUtils.h b/tests/WaitUtils.h
index abe3efc..d7db82e 100644
--- a/tests/WaitUtils.h
+++ b/tests/WaitUtils.h
@@ -25,14 +25,15 @@
namespace pulsar {
template <typename Rep, typename Period>
-inline void waitUntil(std::chrono::duration<Rep, Period> timeout,
std::function<bool()> condition) {
+inline void waitUntil(std::chrono::duration<Rep, Period> timeout, const
std::function<bool()>& condition,
+ long durationMs = 10) {
auto timeoutMs =
std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count();
while (timeoutMs > 0) {
auto now = std::chrono::high_resolution_clock::now();
if (condition()) {
break;
}
- std::this_thread::sleep_for(std::chrono::milliseconds(10));
+ std::this_thread::sleep_for(std::chrono::milliseconds(durationMs));
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::high_resolution_clock::now() - now)
.count();