This is an automated email from the ASF dual-hosted git repository. mmerli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 4ef8dc5abcb [fix][cpp] Fix flaky testReferenceCount (#17645) 4ef8dc5abcb is described below commit 4ef8dc5abcb2b7a2e1d10449c59331b974d10854 Author: Yunze Xu <xyzinfern...@163.com> AuthorDate: Thu Sep 15 00:05:38 2022 +0800 [fix][cpp] Fix flaky testReferenceCount (#17645) Fixes #14848 ### Motivation There were several fixes on `ClientTest.testReferenceCount` but it's still very flaky. The root cause is even after a `Reader` went out of the scope and destructed, there was still a `Reader` object existed in the thread of the event loop. See https://github.com/apache/pulsar/blob/845daf5cac23a4dda4a209d91c85804a0bcaf28a/pulsar-client-cpp/lib/ReaderImpl.cc#L88 To verify this point, I added some logs and saw: ``` 2022-09-14 03:52:28.427 INFO [140046042864960] Reader:39 | Reader ctor 0x7fffd2a7c110 # ... 2022-09-14 03:52:28.444 INFO [140046039774976] Reader:42 | Reader ctor 0x7f5f0273d720 ReaderImpl(0x7f5efc00a9d0, 3) # ... 2022-09-14 03:52:28.445 INFO [140046042864960] ClientTest:217 | Reference count of the reader: 4 # ... 2022-09-14 03:52:28.445 INFO [140046042864960] ClientImpl:490 | Closing Pulsar client with 1 producers and 2 consumers 2022-09-14 03:52:28.445 INFO [140046039774976] Reader:55 | Reader dtor 0x7f5f0273d720 ReaderImpl(0x7f5efc00a9d0, 3) ``` The first `Reader` object 0x7fffd2a7c110 was constructed in main thread 140046042864960. However, it destructed before another `Reader` object 0x0x7f5f0273d720 that was constructed in event loop thread 140046039774976. When the callback passed to `createReaderAsync` completed the promise, the `createReader` immediately returns, at the same time the `Reader` object in the callback was still in the scope and not destructed. Since `Reader` holds a `shared_ptr<ReaderImpl>` and `ReaderImpl` holds a `shared_ptr<ConsumerImpl>`, if we check the reference count too quickly, the reference count of the underlying consumer is still positive because the `Reader` was not destructed at the moment. ### Modifications Since we cannot determine the precise destructed time point because that `Reader` object is in the event loop thread, we have to wait for a while. This PR adds a `waitUntil` utility function to wait for at most some time until the condition is met. Then wait until the reference count becomes 0 after the `Reader` object goes out of scope. Replace `ASSERT_EQ` with `EXPECT_EQ` to let the test continue if it failed. ### Verifying this change Following the steps here to reproduce: https://github.com/apache/pulsar/issues/14848#issuecomment-1246375370 The test never failed even with `--gtest_repeat=100`. --- pulsar-client-cpp/tests/ClientTest.cc | 11 ++++++--- pulsar-client-cpp/tests/WaitUtils.h | 43 +++++++++++++++++++++++++++++++++++ 2 files changed, 51 insertions(+), 3 deletions(-) diff --git a/pulsar-client-cpp/tests/ClientTest.cc b/pulsar-client-cpp/tests/ClientTest.cc index cac6a81a561..90ece8d4bcc 100644 --- a/pulsar-client-cpp/tests/ClientTest.cc +++ b/pulsar-client-cpp/tests/ClientTest.cc @@ -20,6 +20,7 @@ #include "HttpHelper.h" #include "PulsarFriend.h" +#include "WaitUtils.h" #include <future> #include <pulsar/Client.h> @@ -219,9 +220,13 @@ TEST(ClientTest, testReferenceCount) { ASSERT_EQ(producers.size(), 1); ASSERT_EQ(producers[0].use_count(), 0); ASSERT_EQ(consumers.size(), 2); - ASSERT_EQ(consumers[0].use_count(), 0); - ASSERT_EQ(consumers[1].use_count(), 0); - ASSERT_EQ(readerWeakPtr.use_count(), 0); + + waitUntil(std::chrono::seconds(1), [&consumers, &readerWeakPtr] { + return consumers[0].use_count() == 0 && consumers[1].use_count() == 0 && readerWeakPtr.expired(); + }); + EXPECT_EQ(consumers[0].use_count(), 0); + EXPECT_EQ(consumers[1].use_count(), 0); + EXPECT_EQ(readerWeakPtr.use_count(), 0); client.close(); } diff --git a/pulsar-client-cpp/tests/WaitUtils.h b/pulsar-client-cpp/tests/WaitUtils.h new file mode 100644 index 00000000000..abe3efccff4 --- /dev/null +++ b/pulsar-client-cpp/tests/WaitUtils.h @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#pragma once + +#include <chrono> +#include <functional> +#include <thread> + +namespace pulsar { + +template <typename Rep, typename Period> +inline void waitUntil(std::chrono::duration<Rep, Period> timeout, std::function<bool()> condition) { + auto timeoutMs = std::chrono::duration_cast<std::chrono::milliseconds>(timeout).count(); + while (timeoutMs > 0) { + auto now = std::chrono::high_resolution_clock::now(); + if (condition()) { + break; + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>( + std::chrono::high_resolution_clock::now() - now) + .count(); + timeoutMs -= elapsed; + } +} + +} // namespace pulsar