This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pulsar-client-cpp.git
The following commit(s) were added to refs/heads/main by this push: new 37bdf5b Support seek operation on a multi-topics consumer (#426) 37bdf5b is described below commit 37bdf5b3856190eefc7a703753361da0e3a6e2c0 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Tue Jun 4 09:51:55 2024 +0800 Support seek operation on a multi-topics consumer (#426) ### Motivation See https://github.com/apache/pulsar-client-python/issues/213 ### Modifications Add a new `forEachValue` overload that allows users to count the number of rest running tasks through `SharedFuture` to `SynchronizedHashMap`. Leverage this overload in seek operations when the argument is a timestamp, or a MessageId that represents earliest or latest. When the argument is a MessageId whose `getTopicName()` method returns a correct topic name, seek on the internal consumer of that topic. Add `testMultiTopicsSeekAll` and `testMultiTopicsSeekSingle` to `ConsumerSeekTest` to cover these cases. --- lib/MultiTopicsConsumerImpl.cc | 110 +++++++++------------ lib/MultiTopicsConsumerImpl.h | 52 +++++++++- lib/SynchronizedHashMap.h | 67 ++++++++++++- tests/ConsumerSeekTest.cc | 205 +++++++++++++++++++++++++++++++++++++++ tests/ConsumerTest.cc | 65 ------------- tests/SynchronizedHashMapTest.cc | 22 +++++ 6 files changed, 385 insertions(+), 136 deletions(-) diff --git a/lib/MultiTopicsConsumerImpl.cc b/lib/MultiTopicsConsumerImpl.cc index 80566c8..e95a9ac 100644 --- a/lib/MultiTopicsConsumerImpl.cc +++ b/lib/MultiTopicsConsumerImpl.cc @@ -338,41 +338,23 @@ void MultiTopicsConsumerImpl::unsubscribeAsync(ResultCallback originalCallback) } state_ = Closing; - std::shared_ptr<std::atomic<int>> consumerUnsubed = std::make_shared<std::atomic<int>>(0); auto self = get_shared_this_ptr(); - int numConsumers = 0; consumers_.forEachValue( - [&numConsumers, &consumerUnsubed, &self, callback](const ConsumerImplPtr& consumer) { - numConsumers++; - consumer->unsubscribeAsync([self, consumerUnsubed, callback](Result result) { - self->handleUnsubscribedAsync(result, consumerUnsubed, callback); + [this, self, callback](const ConsumerImplPtr& consumer, SharedFuture future) { + consumer->unsubscribeAsync([this, self, callback, future](Result result) { + if (result != ResultOk) { + state_ = Failed; + LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " + << result << " subscription - " << subscriptionName_); + } + if (future.tryComplete()) { + LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " + << consumerStr_); + callback((state_ != Failed) ? ResultOk : ResultUnknownError); + } }); - }); - if (numConsumers == 0) { - // No need to unsubscribe, since the list matching the regex was empty - callback(ResultOk); - } -} - -void MultiTopicsConsumerImpl::handleUnsubscribedAsync(Result result, - std::shared_ptr<std::atomic<int>> consumerUnsubed, - ResultCallback callback) { - (*consumerUnsubed)++; - - if (result != ResultOk) { - state_ = Failed; - LOG_ERROR("Error Closing one of the consumers in TopicsConsumer, result: " - << result << " subscription - " << subscriptionName_); - } - - if (consumerUnsubed->load() == numberTopicPartitions_->load()) { - LOG_DEBUG("Unsubscribed all of the partition consumer for TopicsConsumer. - " << consumerStr_); - Result result1 = (state_ != Failed) ? ResultOk : ResultUnknownError; - // The `callback` is a wrapper of user provided callback, it's not null and will call `shutdown()` if - // unsubscribe succeeds. - callback(result1); - return; - } + }, + [callback] { callback(ResultOk); }); } void MultiTopicsConsumerImpl::unsubscribeOneTopicAsync(const std::string& topic, ResultCallback callback) { @@ -899,50 +881,52 @@ std::shared_ptr<TopicName> MultiTopicsConsumerImpl::topicNamesValid(const std::v return topicNamePtr; } -void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { - callback(ResultOperationNotSupported); -} - -void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { - if (state_ != Ready) { - callback(ResultAlreadyClosed); - return; - } - +void MultiTopicsConsumerImpl::beforeSeek() { duringSeek_.store(true, std::memory_order_release); consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->pauseMessageListener(); }); unAckedMessageTrackerPtr_->clear(); incomingMessages_.clear(); incomingMessagesSize_ = 0L; +} + +void MultiTopicsConsumerImpl::afterSeek() { + duringSeek_.store(false, std::memory_order_release); + auto self = get_shared_this_ptr(); + listenerExecutor_->postWork([this, self] { + consumers_.forEachValue([](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); }); + }); +} + +void MultiTopicsConsumerImpl::seekAsync(const MessageId& msgId, ResultCallback callback) { + if (msgId == MessageId::earliest() || msgId == MessageId::latest()) { + return seekAllAsync(msgId, callback); + } + auto optConsumer = consumers_.find(msgId.getTopicName()); + if (!optConsumer) { + LOG_ERROR(getName() << "cannot seek a message id whose topic \"" + msgId.getTopicName() + + "\" is not subscribed"); + callback(ResultOperationNotSupported); + return; + } + + beforeSeek(); auto weakSelf = weak_from_this(); - auto numConsumersLeft = std::make_shared<std::atomic<int64_t>>(consumers_.size()); - auto wrappedCallback = [this, weakSelf, callback, numConsumersLeft](Result result) { + optConsumer.get()->seekAsync(msgId, [this, weakSelf, callback](Result result) { auto self = weakSelf.lock(); - if (PULSAR_UNLIKELY(!self)) { - callback(result); - return; - } - if (result != ResultOk) { - *numConsumersLeft = 0; // skip the following callbacks + if (self) { + afterSeek(); callback(result); - return; - } - if (--*numConsumersLeft > 0) { - return; + } else { + callback(ResultAlreadyClosed); } - duringSeek_.store(false, std::memory_order_release); - listenerExecutor_->postWork([this, self] { - consumers_.forEachValue( - [](const ConsumerImplPtr& consumer) { consumer->resumeMessageListener(); }); - }); - callback(ResultOk); - }; - consumers_.forEachValue([timestamp, &wrappedCallback](const ConsumerImplPtr& consumer) { - consumer->seekAsync(timestamp, wrappedCallback); }); } +void MultiTopicsConsumerImpl::seekAsync(uint64_t timestamp, ResultCallback callback) { + seekAllAsync(timestamp, callback); +} + void MultiTopicsConsumerImpl::setNegativeAcknowledgeEnabledForTesting(bool enabled) { consumers_.forEachValue([enabled](const ConsumerImplPtr& consumer) { consumer->setNegativeAcknowledgeEnabledForTesting(enabled); diff --git a/lib/MultiTopicsConsumerImpl.h b/lib/MultiTopicsConsumerImpl.h index b5c51ec..6763942 100644 --- a/lib/MultiTopicsConsumerImpl.h +++ b/lib/MultiTopicsConsumerImpl.h @@ -25,7 +25,7 @@ #include <vector> #include "Commands.h" -#include "ConsumerImplBase.h" +#include "ConsumerImpl.h" #include "ConsumerInterceptors.h" #include "Future.h" #include "Latch.h" @@ -38,7 +38,6 @@ namespace pulsar { typedef std::shared_ptr<Promise<Result, Consumer>> ConsumerSubResultPromisePtr; -class ConsumerImpl; using ConsumerImplPtr = std::shared_ptr<ConsumerImpl>; class ClientImpl; using ClientImplPtr = std::shared_ptr<ClientImpl>; @@ -152,8 +151,6 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { void handleSingleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr, std::shared_ptr<std::atomic<int>> partitionsNeedCreate, ConsumerSubResultPromisePtr topicSubResultPromise); - void handleUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed, - ResultCallback callback); void handleOneTopicUnsubscribedAsync(Result result, std::shared_ptr<std::atomic<int>> consumerUnsubed, int numberPartitions, TopicNamePtr topicNamePtr, std::string& topicPartitionName, ResultCallback callback); @@ -179,6 +176,16 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { return std::static_pointer_cast<MultiTopicsConsumerImpl>(shared_from_this()); } + template <typename SeekArg> +#if __cplusplus >= 202002L + requires std::convertible_to<SeekArg, uint64_t> || + std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId> +#endif + void seekAllAsync(const SeekArg& seekArg, ResultCallback callback); + + void beforeSeek(); + void afterSeek(); + FRIEND_TEST(ConsumerTest, testMultiTopicsConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testPartitionedConsumerUnAckedMessageRedelivery); FRIEND_TEST(ConsumerTest, testAcknowledgeCumulativeWithPartition); @@ -187,5 +194,42 @@ class MultiTopicsConsumerImpl : public ConsumerImplBase { }; typedef std::shared_ptr<MultiTopicsConsumerImpl> MultiTopicsConsumerImplPtr; + +template <typename SeekArg> +#if __cplusplus >= 202002L + requires std::convertible_to<SeekArg, uint64_t> || + std::same_as<std::remove_cv_t<std::remove_reference_t<SeekArg>>, MessageId> +#endif + inline void MultiTopicsConsumerImpl::seekAllAsync(const SeekArg& seekArg, ResultCallback callback) { + if (state_ != Ready) { + callback(ResultAlreadyClosed); + return; + } + beforeSeek(); + auto weakSelf = weak_from_this(); + auto failed = std::make_shared<std::atomic_bool>(false); + consumers_.forEachValue( + [this, weakSelf, &seekArg, callback, failed](const ConsumerImplPtr& consumer, SharedFuture future) { + consumer->seekAsync(seekArg, [this, weakSelf, callback, failed, future](Result result) { + auto self = weakSelf.lock(); + if (!self || failed->load(std::memory_order_acquire)) { + callback(result); + return; + } + if (result != ResultOk) { + failed->store(true, std::memory_order_release); // skip the following callbacks + afterSeek(); + callback(result); + return; + } + if (future.tryComplete()) { + afterSeek(); + callback(ResultOk); + } + }); + }, + [callback] { callback(ResultOk); }); +} + } // namespace pulsar #endif // PULSAR_MULTI_TOPICS_CONSUMER_HEADER diff --git a/lib/SynchronizedHashMap.h b/lib/SynchronizedHashMap.h index 082aeaf..e224913 100644 --- a/lib/SynchronizedHashMap.h +++ b/lib/SynchronizedHashMap.h @@ -18,8 +18,10 @@ */ #pragma once +#include <atomic> #include <boost/optional.hpp> #include <functional> +#include <memory> #include <mutex> #include <unordered_map> #include <utility> @@ -27,6 +29,16 @@ namespace pulsar { +class SharedFuture { + public: + SharedFuture(size_t size) : count_(std::make_shared<std::atomic_size_t>(size)) {} + + bool tryComplete() const { return --*count_ == 0; } + + private: + std::shared_ptr<std::atomic_size_t> count_; +}; + // V must be default constructible and copyable template <typename K, typename V> class SynchronizedHashMap { @@ -60,10 +72,57 @@ class SynchronizedHashMap { } } - void forEachValue(std::function<void(const V&)> f) const { - Lock lock(mutex_); - for (const auto& kv : data_) { - f(kv.second); + template <typename ValueFunc> +#if __cplusplus >= 202002L + requires requires(ValueFunc&& each, const V& value) { + each(value); + } +#endif + void forEachValue(ValueFunc&& each) { + Lock lock{mutex_}; + for (auto&& kv : data_) { + each(kv.second); + } + } + + // This override provides a convenient approach to execute tasks on each consumer concurrently and + // supports checking if all tasks are done in the `each` callback. + // + // All map values will be passed as the 1st argument to the `each` function. The 2nd argument is a shared + // future whose `tryComplete` method marks this task as completed. If users want to check if all task are + // completed in the `each` function, this method must be called. + // + // For example, given a `SynchronizedHashMap<int, std::string>` object `m` and the following call: + // + // ```c++ + // m.forEachValue([](const std::string& s, SharedFuture future) { + // std::cout << s << std::endl; + // if (future.tryComplete()) { + // std::cout << "done" << std::endl; + // } + // }, [] { std::cout << "empty map" << std::endl; }); + // ``` + // + // If the map is empty, only "empty map" will be printed. Otherwise, all values will be printed + // and "done" will be printed after that. + template <typename ValueFunc, typename EmptyFunc> +#if __cplusplus >= 202002L + requires requires(ValueFunc&& each, const V& value, SharedFuture count, EmptyFunc emptyFunc) { + each(value, count); + emptyFunc(); + } +#endif + void forEachValue(ValueFunc&& each, EmptyFunc&& emptyFunc) { + std::unique_lock<MutexType> lock{mutex_}; + if (data_.empty()) { + lock.unlock(); + emptyFunc(); + return; + } + SharedFuture future{data_.size()}; + for (auto&& kv : data_) { + const auto& value = kv.second; + each(value, future); } } diff --git a/tests/ConsumerSeekTest.cc b/tests/ConsumerSeekTest.cc new file mode 100644 index 0000000..f03ea5e --- /dev/null +++ b/tests/ConsumerSeekTest.cc @@ -0,0 +1,205 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include <gtest/gtest.h> +#include <pulsar/Client.h> + +#include <set> +#include <stdexcept> +#include <string> + +#include "HttpHelper.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() + +static const std::string lookupUrl = "pulsar://localhost:6650"; +static const std::string adminUrl = "http://localhost:8080/"; + +extern std::string unique_str(); + +namespace pulsar { + +class ConsumerSeekTest : public ::testing::TestWithParam<bool> { + public: + void SetUp() override { client_ = Client{lookupUrl}; } + + void TearDown() override { client_.close(); } + + protected: + Client client_{lookupUrl}; + ProducerConfiguration producerConf_; + + std::vector<Producer> initProducersForPartitionedTopic(const std::string& topic) { + constexpr int numPartitions = 3; + int res = makePutRequest(adminUrl + "admin/v2/persistent/public/default/" + topic + "/partitions", + std::to_string(numPartitions)); + if (res != 204 && res != 409) { + throw std::runtime_error("Failed to create partitioned topic: " + std::to_string(res)); + } + + std::vector<Producer> producers(numPartitions); + for (int i = 0; i < numPartitions; i++) { + auto result = client_.createProducer(topic + "-partition-" + std::to_string(i), producers[i]); + if (result != ResultOk) { + throw std::runtime_error(std::string{"Failed to create producer: "} + strResult(result)); + } + } + return producers; + } + + Consumer createConsumer(const std::string& topic) { + Consumer consumer; + ConsumerConfiguration conf; + conf.setStartMessageIdInclusive(GetParam()); + auto result = client_.subscribe(topic, "sub", conf, consumer); + if (result != ResultOk) { + throw std::runtime_error(std::string{"Failed to subscribe: "} + strResult(result)); + } + return consumer; + } +}; + +TEST_P(ConsumerSeekTest, testSeekForMessageId) { + Client client(lookupUrl); + + const std::string topic = "test-seek-for-message-id-" + std::string((GetParam() ? "batch-" : "")) + + std::to_string(time(nullptr)); + + Producer producer; + ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); + + Consumer consumerExclusive; + ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive)); + + Consumer consumerInclusive; + ASSERT_EQ(ResultOk, + client.subscribe(topic, "sub-1", ConsumerConfiguration().setStartMessageIdInclusive(true), + consumerInclusive)); + + const auto numMessages = 100; + MessageId seekMessageId; + + int r = (rand() % (numMessages - 1)); + for (int i = 0; i < numMessages; i++) { + MessageId id; + ASSERT_EQ(ResultOk, + producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id)); + + if (i == r) { + seekMessageId = id; + } + } + + LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r); + + consumerExclusive.seek(seekMessageId); + Message msg0; + ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000)); + + consumerInclusive.seek(seekMessageId); + Message msg1; + ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000)); + + LOG_INFO("consumerExclusive received " << msg0.getDataAsString() << " from " << msg0.getMessageId()); + LOG_INFO("consumerInclusive received " << msg1.getDataAsString() << " from " << msg1.getMessageId()); + + ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1)); + ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r)); + + consumerInclusive.close(); + consumerExclusive.close(); + producer.close(); +} + +TEST_P(ConsumerSeekTest, testMultiTopicsSeekAll) { + std::string topic = "consumer-seek-test-multi-topics-seek-all-" + unique_str(); + auto producers = initProducersForPartitionedTopic(topic); + auto consumer = createConsumer(topic); + const auto numPartitions = producers.size(); + + auto receive = [&consumer, numPartitions] { + std::set<std::string> values; + for (int i = 0; i < numPartitions; i++) { + Message msg; + auto result = consumer.receive(msg, 3000); + if (result != ResultOk) { + throw std::runtime_error(std::string{"Receive failed: "} + strResult(result)); + } + values.emplace(msg.getDataAsString()); + } + return values; + }; + + for (int i = 0; i < numPartitions; i++) { + producers[i].send(MessageBuilder().setContent("msg-" + std::to_string(i) + "-0").build()); + } + ASSERT_EQ(receive(), (std::set<std::string>{"msg-0-0", "msg-1-0", "msg-2-0"})); + + // Seek to earliest + ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); + ASSERT_EQ(receive(), (std::set<std::string>{"msg-0-0", "msg-1-0", "msg-2-0"})); + + // Seek to latest + for (int i = 0; i < numPartitions; i++) { + producers[i].send(MessageBuilder().setContent("msg-" + std::to_string(i) + "-1").build()); + } + ASSERT_EQ(ResultOk, consumer.seek(MessageId::latest())); + + for (int i = 0; i < numPartitions; i++) { + producers[i].send(MessageBuilder().setContent("msg-" + std::to_string(i) + "-2").build()); + } + ASSERT_EQ(receive(), (std::set<std::string>{"msg-0-2", "msg-1-2", "msg-2-2"})); +} + +TEST_P(ConsumerSeekTest, testMultiTopicsSeekSingle) { + std::string topic = "consumer-seek-test-multi-topics-seek-single-" + unique_str(); + auto producers = initProducersForPartitionedTopic(topic); + auto consumer = createConsumer(topic); + + MessageId msgId; + producers[0].send(MessageBuilder().setContent("msg-0").build(), msgId); + ASSERT_EQ(ResultOperationNotSupported, consumer.seek(msgId)); + producers[0].send(MessageBuilder().setContent("msg-1").build(), msgId); + ASSERT_EQ(ResultOperationNotSupported, consumer.seek(msgId)); + + std::vector<MessageId> msgIds; + Message msg; + for (int i = 0; i < 2; i++) { + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + msgIds.emplace_back(msg.getMessageId()); + } + + ASSERT_EQ(ResultOk, consumer.seek(msgIds[0])); + ASSERT_EQ(ResultOk, consumer.receive(msg, 3000)); + if (GetParam()) { + ASSERT_EQ(msg.getMessageId(), msgIds[0]); + } else { + ASSERT_EQ(msg.getMessageId(), msgIds[1]); + } +} + +TEST_F(ConsumerSeekTest, testNoInternalConsumer) { + Consumer consumer; + ASSERT_EQ(ResultOk, client_.subscribeWithRegex("testNoInternalConsumer.*", "sub", consumer)); + ASSERT_EQ(ResultOk, consumer.seek(MessageId::earliest())); +} + +INSTANTIATE_TEST_SUITE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); + +} // namespace pulsar diff --git a/tests/ConsumerTest.cc b/tests/ConsumerTest.cc index 2aab722..f9840f9 100644 --- a/tests/ConsumerTest.cc +++ b/tests/ConsumerTest.cc @@ -1136,69 +1136,6 @@ TEST(ConsumerTest, testPatternSubscribeTopic) { client.close(); } -class ConsumerSeekTest : public ::testing::TestWithParam<bool> { - public: - void SetUp() override { producerConf_ = ProducerConfiguration().setBatchingEnabled(GetParam()); } - - void TearDown() override { client_.close(); } - - protected: - Client client_{lookupUrl}; - ProducerConfiguration producerConf_; -}; - -TEST_P(ConsumerSeekTest, testSeekForMessageId) { - Client client(lookupUrl); - - const std::string topic = "test-seek-for-message-id-" + std::string((GetParam() ? "batch-" : "")) + - std::to_string(time(nullptr)); - - Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topic, producerConf_, producer)); - - Consumer consumerExclusive; - ASSERT_EQ(ResultOk, client.subscribe(topic, "sub-0", consumerExclusive)); - - Consumer consumerInclusive; - ASSERT_EQ(ResultOk, - client.subscribe(topic, "sub-1", ConsumerConfiguration().setStartMessageIdInclusive(true), - consumerInclusive)); - - const auto numMessages = 100; - MessageId seekMessageId; - - int r = (rand() % (numMessages - 1)); - for (int i = 0; i < numMessages; i++) { - MessageId id; - ASSERT_EQ(ResultOk, - producer.send(MessageBuilder().setContent("msg-" + std::to_string(i)).build(), id)); - - if (i == r) { - seekMessageId = id; - } - } - - LOG_INFO("The seekMessageId is: " << seekMessageId << ", r : " << r); - - consumerExclusive.seek(seekMessageId); - Message msg0; - ASSERT_EQ(ResultOk, consumerExclusive.receive(msg0, 3000)); - - consumerInclusive.seek(seekMessageId); - Message msg1; - ASSERT_EQ(ResultOk, consumerInclusive.receive(msg1, 3000)); - - LOG_INFO("consumerExclusive received " << msg0.getDataAsString() << " from " << msg0.getMessageId()); - LOG_INFO("consumerInclusive received " << msg1.getDataAsString() << " from " << msg1.getMessageId()); - - ASSERT_EQ(msg0.getDataAsString(), "msg-" + std::to_string(r + 1)); - ASSERT_EQ(msg1.getDataAsString(), "msg-" + std::to_string(r)); - - consumerInclusive.close(); - consumerExclusive.close(); - producer.close(); -} - TEST(ConsumerTest, testNegativeAcksTrackerClose) { Client client(lookupUrl); auto topicName = "testNegativeAcksTrackerClose"; @@ -1252,8 +1189,6 @@ TEST(ConsumerTest, testAckNotPersistentTopic) { client.close(); } -INSTANTIATE_TEST_CASE_P(Pulsar, ConsumerSeekTest, ::testing::Values(true, false)); - class InterceptorForNegAckDeadlock : public ConsumerInterceptor { public: Message beforeConsume(const Consumer& consumer, const Message& message) override { return message; } diff --git a/tests/SynchronizedHashMapTest.cc b/tests/SynchronizedHashMapTest.cc index 85378e0..cf184d9 100644 --- a/tests/SynchronizedHashMapTest.cc +++ b/tests/SynchronizedHashMapTest.cc @@ -91,6 +91,28 @@ TEST(SynchronizedHashMapTest, testForEach) { m.forEach([&pairs](const int& key, const int& value) { pairs.emplace_back(key, value); }); PairVector expectedPairs({{1, 100}, {2, 200}, {3, 300}}); ASSERT_EQ(sort(pairs), expectedPairs); + + m.clear(); + int result = 0; + values.clear(); + m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); }, + [&result] { result = 1; }); + ASSERT_TRUE(values.empty()); + ASSERT_EQ(result, 1); + + m.emplace(1, 100); + m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); }, + [&result] { result = 2; }); + ASSERT_EQ(values, (std::vector<int>({100}))); + ASSERT_EQ(result, 1); + + values.clear(); + m.emplace(2, 200); + m.forEachValue([&values](int value, SharedFuture) { values.emplace_back(value); }, + [&result] { result = 2; }); + std::sort(values.begin(), values.end()); + ASSERT_EQ(values, (std::vector<int>({100, 200}))); + ASSERT_EQ(result, 1); } TEST(SynchronizedHashMap, testRecursiveMutex) {