This is an automated email from the ASF dual-hosted git repository. xyz pushed a commit to branch branch-2.8 in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit 02b208cc581b3de13c72d2db1210893f3139cb57 Author: Yunze Xu <[email protected]> AuthorDate: Fri Apr 1 19:09:14 2022 +0800 [C++] Fix flaky tests about reference count (#14854) Fixes #14848 Fixes #14719 ### Motivation #7793 introduced a `testReferenceLeak` to avoid cyclic referenece of the reader. However, it adds a unused field `readerImplWeakPtr_` only for tests. The access to this field is not thread safe that the write operation happens in `handleConsumerCreated` while the read operation can happen anywhere via the getter. So there is a little chance that `readerPtr` in `testReferenceLeak` doesn't point to the right object. In addition, we should only guarantee the reference count becomes 0 after the producer, consumer or reader goes out of its scope. #14797 adds a `ClientTest.testReferenceCount` but it's also flaky. It's caused by the shared pointer of `ProducerImpl` is published to another thread via `shared_from_this()` but the test has a strong expectation that the reference count is exactly 1. ### Modifications - Remove `readerImplWeakPtr_` from `ReaderImpl` and get the weak pointer from `Reader` directly by adding a method to `PulsarFriend`. - Add the check of reader's reference count to `testReferenceCount` and remove the redundant `testReferenceLeak`. - Instead of asserting the reference count of producer/consumer/reader is 1, just assume the it's greater than 0. (cherry picked from commit f84ff571df95f99efa4596e65324def1084fc11b) --- pulsar-client-cpp/lib/ReaderImpl.cc | 5 ---- pulsar-client-cpp/lib/ReaderImpl.h | 5 +--- pulsar-client-cpp/tests/ClientTest.cc | 26 +++++++++++++++-- pulsar-client-cpp/tests/PulsarFriend.h | 7 +++++ pulsar-client-cpp/tests/ReaderTest.cc | 51 +++------------------------------- pulsar-client-cpp/tests/ReaderTest.h | 32 --------------------- 6 files changed, 35 insertions(+), 91 deletions(-) diff --git a/pulsar-client-cpp/lib/ReaderImpl.cc b/pulsar-client-cpp/lib/ReaderImpl.cc index 0a7b321..9401c12 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.cc +++ b/pulsar-client-cpp/lib/ReaderImpl.cc @@ -90,11 +90,8 @@ const std::string& ReaderImpl::getTopic() const { return consumer_->getTopic(); void ReaderImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumer) { auto self = shared_from_this(); readerCreatedCallback_(result, Reader(self)); - readerImplWeakPtr_ = self; } -ConsumerImplPtr ReaderImpl::getConsumer() { return consumer_; } - Result ReaderImpl::readNext(Message& msg) { Result res = consumer_->receive(msg); acknowledgeIfNecessary(res, msg); @@ -144,8 +141,6 @@ void ReaderImpl::getLastMessageIdAsync(GetLastMessageIdCallback callback) { }); } -ReaderImplWeakPtr ReaderImpl::getReaderImplWeakPtr() { return readerImplWeakPtr_; } - bool ReaderImpl::isConnected() const { return consumer_->isConnected(); } } // namespace pulsar diff --git a/pulsar-client-cpp/lib/ReaderImpl.h b/pulsar-client-cpp/lib/ReaderImpl.h index a546ae8..6de6c02 100644 --- a/pulsar-client-cpp/lib/ReaderImpl.h +++ b/pulsar-client-cpp/lib/ReaderImpl.h @@ -53,7 +53,7 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> Future<Result, ReaderImplWeakPtr> getReaderCreatedFuture(); - ConsumerImplPtr getConsumer(); + ConsumerImplBaseWeakPtr getConsumer() const noexcept { return consumer_; } void hasMessageAvailableAsync(HasMessageAvailableCallback callback); @@ -62,8 +62,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> void getLastMessageIdAsync(GetLastMessageIdCallback callback); - ReaderImplWeakPtr getReaderImplWeakPtr(); - bool isConnected() const; private: @@ -79,7 +77,6 @@ class PULSAR_PUBLIC ReaderImpl : public std::enable_shared_from_this<ReaderImpl> ConsumerImplPtr consumer_; ReaderCallback readerCreatedCallback_; ReaderListener readerListener_; - ReaderImplWeakPtr readerImplWeakPtr_; }; } // namespace pulsar diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index 920430d..1ba0164 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -24,6 +24,9 @@ #include <future> #include <pulsar/Client.h> #include "../lib/checksum/ChecksumProvider.h" +#include "lib/LogUtils.h" + +DECLARE_LOG_OBJECT() using namespace pulsar; @@ -184,22 +187,39 @@ TEST(ClientTest, testReferenceCount) { auto &producers = PulsarFriend::getProducers(client); auto &consumers = PulsarFriend::getConsumers(client); + ReaderImplWeakPtr readerWeakPtr; { Producer producer; ASSERT_EQ(ResultOk, client.createProducer(topic, producer)); ASSERT_EQ(producers.size(), 1); - ASSERT_EQ(producers[0].use_count(), 1); + ASSERT_TRUE(producers[0].use_count() > 0); + LOG_INFO("Reference count of the producer: " << producers[0].use_count()); Consumer consumer; ASSERT_EQ(ResultOk, client.subscribe(topic, "my-sub", consumer)); ASSERT_EQ(consumers.size(), 1); - ASSERT_EQ(consumers[0].use_count(), 1); + ASSERT_TRUE(consumers[0].use_count() > 0); + LOG_INFO("Reference count of the consumer: " << consumers[0].use_count()); + + ReaderConfiguration readerConf; + Reader reader; + ASSERT_EQ(ResultOk, + client.createReader(topic + "-reader", MessageId::earliest(), readerConf, reader)); + ASSERT_EQ(consumers.size(), 2); + ASSERT_TRUE(consumers[1].use_count() > 0); + LOG_INFO("Reference count of the reader's underlying consumer: " << consumers[1].use_count()); + + readerWeakPtr = PulsarFriend::getReaderImplWeakPtr(reader); + ASSERT_EQ(readerWeakPtr.use_count(), 1); + LOG_INFO("Reference count of the reader: " << readerWeakPtr.use_count()); } ASSERT_EQ(producers.size(), 1); ASSERT_EQ(producers[0].use_count(), 0); - ASSERT_EQ(consumers.size(), 1); + ASSERT_EQ(consumers.size(), 2); ASSERT_EQ(consumers[0].use_count(), 0); + ASSERT_EQ(consumers[1].use_count(), 0); + ASSERT_EQ(readerWeakPtr.use_count(), 0); client.close(); } diff --git a/pulsar-client-cpp/tests/PulsarFriend.h b/pulsar-client-cpp/tests/PulsarFriend.h index 74aa1f7..2d9b558 100644 --- a/pulsar-client-cpp/tests/PulsarFriend.h +++ b/pulsar-client-cpp/tests/PulsarFriend.h @@ -25,6 +25,7 @@ #include "lib/ConsumerImpl.h" #include "lib/PartitionedConsumerImpl.h" #include "lib/MultiTopicsConsumerImpl.h" +#include "lib/ReaderImpl.h" using std::string; @@ -79,6 +80,12 @@ class PulsarFriend { return std::static_pointer_cast<ConsumerImpl>(consumer.impl_); } + static ConsumerImplPtr getConsumer(Reader reader) { + return std::static_pointer_cast<ConsumerImpl>(reader.impl_->getConsumer().lock()); + } + + static ReaderImplWeakPtr getReaderImplWeakPtr(Reader reader) { return reader.impl_; } + static std::shared_ptr<PartitionedConsumerImpl> getPartitionedConsumerImplPtr(Consumer consumer) { return std::static_pointer_cast<PartitionedConsumerImpl>(consumer.impl_); } diff --git a/pulsar-client-cpp/tests/ReaderTest.cc b/pulsar-client-cpp/tests/ReaderTest.cc index 8cd535c..bf15692 100644 --- a/pulsar-client-cpp/tests/ReaderTest.cc +++ b/pulsar-client-cpp/tests/ReaderTest.cc @@ -18,8 +18,8 @@ */ #include <pulsar/Client.h> #include <pulsar/Reader.h> -#include "ReaderTest.h" #include "HttpHelper.h" +#include "PulsarFriend.h" #include <gtest/gtest.h> @@ -28,6 +28,7 @@ #include <lib/Latch.h> #include <lib/LogUtils.h> +#include <lib/ReaderImpl.h> DECLARE_LOG_OBJECT() using namespace pulsar; @@ -423,50 +424,6 @@ TEST(ReaderTest, testReaderReachEndOfTopicMessageWithoutBatches) { client.close(); } -TEST(ReaderTest, testReferenceLeak) { - Client client(serviceUrl); - - std::string topicName = "persistent://public/default/testReferenceLeak"; - - Producer producer; - ASSERT_EQ(ResultOk, client.createProducer(topicName, producer)); - - for (int i = 0; i < 10; i++) { - std::string content = "my-message-" + std::to_string(i); - Message msg = MessageBuilder().setContent(content).build(); - ASSERT_EQ(ResultOk, producer.send(msg)); - } - - ReaderConfiguration readerConf; - Reader reader; - ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); - - ConsumerImplBaseWeakPtr consumerPtr = ReaderTest::getConsumer(reader); - ReaderImplWeakPtr readerPtr = ReaderTest::getReaderImplWeakPtr(reader); - - LOG_INFO("1 consumer use count " << consumerPtr.use_count()); - LOG_INFO("1 reader use count " << readerPtr.use_count()); - - for (int i = 0; i < 10; i++) { - Message msg; - ASSERT_EQ(ResultOk, reader.readNext(msg)); - - std::string content = msg.getDataAsString(); - std::string expected = "my-message-" + std::to_string(i); - ASSERT_EQ(expected, content); - } - - producer.close(); - reader.close(); - // will be released after exit this method. - ASSERT_EQ(1, consumerPtr.use_count()); - ASSERT_EQ(1, readerPtr.use_count()); - client.close(); - // will be released after exit this method. - ASSERT_EQ(1, consumerPtr.use_count()); - ASSERT_EQ(1, readerPtr.use_count()); -} - TEST(ReaderTest, testPartitionIndex) { Client client(serviceUrl); @@ -519,7 +476,7 @@ TEST(ReaderTest, testSubscriptionNameSetting) { Reader reader; ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); - ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName()); + ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName()); reader.close(); client.close(); @@ -537,7 +494,7 @@ TEST(ReaderTest, testSetSubscriptionNameAndPrefix) { Reader reader; ASSERT_EQ(ResultOk, client.createReader(topicName, MessageId::earliest(), readerConf, reader)); - ASSERT_EQ(subName, ReaderTest::getConsumer(reader)->getSubscriptionName()); + ASSERT_EQ(subName, PulsarFriend::getConsumer(reader)->getSubscriptionName()); reader.close(); client.close(); diff --git a/pulsar-client-cpp/tests/ReaderTest.h b/pulsar-client-cpp/tests/ReaderTest.h deleted file mode 100644 index fd0387f..0000000 --- a/pulsar-client-cpp/tests/ReaderTest.h +++ /dev/null @@ -1,32 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -#include "lib/ReaderImpl.h" -#include <string> - -using std::string; - -namespace pulsar { -class ReaderTest { - public: - static ConsumerImplPtr getConsumer(const Reader& reader) { return reader.impl_->getConsumer(); } - static ReaderImplWeakPtr getReaderImplWeakPtr(const Reader& reader) { - return reader.impl_->getReaderImplWeakPtr(); - } -}; -} // namespace pulsar
