BewareMyPower commented on code in PR #17209:
URL: https://github.com/apache/pulsar/pull/17209#discussion_r965559130
##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -170,23 +172,25 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
cnx->registerConsumer(consumerId_, shared_from_this());
Lock lockForMessageId(mutexForMessageId_);
- Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
- if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
- // Update startMessageId so that we can discard messages after delivery
- // restarts
- startMessageId_ = firstMessageInQueue;
- }
- const auto startMessageId = startMessageId_;
+ // Update startMessageId so that we can discard messages after delivery
restarts
+ startMessageId_ = clearReceiveQueue();
+ const auto subscribeMessageId = (subscriptionMode_ ==
Commands::SubscriptionModeNonDurable)
+ ? startMessageId_.get()
+ : Optional<MessageId>::empty();
lockForMessageId.unlock();
Review Comment:
```suggestion
// Update startMessageId so that we can discard messages after delivery
restarts
const auto startMessageId = clearReceiveQueue();
lockForMessageId.unlock();
const auto subscribeMessageId = (subscriptionMode_ ==
Commands::SubscriptionModeNonDurable)
? startMessageId
: Optional<MessageId>::empty();
startMessageId_ = startMessageId;
```
The combination of `Synchronized::operator=` and `Synchronized::get` is not
atomic. We can use a local variable to save the value to update.
##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -1380,4 +1365,52 @@ bool ConsumerImpl::isConnected() const { return
!getCnx().expired() && state_ ==
uint64_t ConsumerImpl::getNumberOfConnectedConsumer() { return isConnected() ?
1 : 0; }
+void ConsumerImpl::seekAsyncInternal(long requestId, SharedBuffer seek, const
MessageId& seekId,
+ long timestamp, ResultCallback callback) {
+ ClientConnectionPtr cnx = getCnx().lock();
+ if (!cnx) {
+ LOG_ERROR(getName() << " Client Connection not ready for Consumer");
+ callback(ResultNotConnected);
+ return;
+ }
+
+ const auto originalSeekMessageId = seekMessageId_.get();
+ seekMessageId_ = seekId;
+ duringSeek_ = true;
+ if (timestamp > 0) {
+ LOG_INFO(getName() << " Seeking subscription to " << timestamp);
+ } else {
+ LOG_INFO(getName() << " Seeking subscription to " << seekId);
+ }
+
+ std::weak_ptr<ConsumerImpl> weakSelf{shared_from_this()};
+
+ cnx->sendRequestWithId(seek, requestId)
+ .addListener([this, weakSelf, callback, originalSeekMessageId](Result
result,
+ const
ResponseData& responseData) {
Review Comment:
We must check if `this` is still valid by validating the reference count of
`weakSelf`.
```c++
auto self = weakSelf.lock();
if (!self) { // this is invalid now
```
##########
pulsar-client-cpp/lib/ConsumerImpl.h:
##########
@@ -46,6 +46,8 @@
#include <lib/stats/ConsumerStatsDisabled.h>
#include <queue>
#include <atomic>
+#include "SharedBuffer.h"
Review Comment:
```suggestion
```
The new code doesn't introduce a dependency on `SharedBuffer`.
##########
pulsar-client-cpp/lib/ConsumerImpl.cc:
##########
@@ -170,23 +172,25 @@ void ConsumerImpl::connectionOpened(const
ClientConnectionPtr& cnx) {
cnx->registerConsumer(consumerId_, shared_from_this());
Lock lockForMessageId(mutexForMessageId_);
- Optional<MessageId> firstMessageInQueue = clearReceiveQueue();
- if (subscriptionMode_ == Commands::SubscriptionModeNonDurable) {
- // Update startMessageId so that we can discard messages after delivery
- // restarts
- startMessageId_ = firstMessageInQueue;
- }
- const auto startMessageId = startMessageId_;
+ // Update startMessageId so that we can discard messages after delivery
restarts
+ startMessageId_ = clearReceiveQueue();
+ const auto subscribeMessageId = (subscriptionMode_ ==
Commands::SubscriptionModeNonDurable)
+ ? startMessageId_.get()
+ : Optional<MessageId>::empty();
lockForMessageId.unlock();
unAckedMessageTrackerPtr_->clear();
batchAcknowledgementTracker_.clear();
ClientImplPtr client = client_.lock();
uint64_t requestId = client->newRequestId();
+ if (duringSeek_) {
+ ackGroupingTrackerPtr_->flushAndClean();
+ }
Review Comment:
It looks like it should be put before `clearReceiveQueue` is called, see
https://github.com/apache/pulsar/blob/2848fa0da09e035951220c3d04138041e1477e60/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java#L820-L831
##########
pulsar-client-cpp/lib/ConsumerImpl.h:
##########
@@ -169,6 +170,8 @@ class ConsumerImpl : public ConsumerImplBase,
void drainIncomingMessageQueue(size_t count);
uint32_t receiveIndividualMessagesFromBatch(const ClientConnectionPtr&
cnx, Message& batchedMessage,
int redeliveryCount);
+ bool isPriorBatchIndex(long idx);
+ bool isPriorEntryIndex(long idx);
Review Comment:
These two methods could be const. Actually the type should not be `long`,
though Java client uses `long`.
The type of batch index is `int32_t`, and the type of entry index is
`int64_t`, see the `batchIndex()` and `entryId()` methods of `MessageId`.
##########
pulsar-client-cpp/tests/ConsumerTest.cc:
##########
@@ -797,4 +797,68 @@ TEST(ConsumerTest,
testGetLastMessageIdBlockWhenConnectionDisconnected) {
ASSERT_GE(elapsed.seconds(), operationTimeout);
}
+class ConsumerSeekTest : public ::testing::TestWithParam<bool> {
+ public:
+ void SetUp() override { producerConf_ =
ProducerConfiguration().setBatchingEnabled(GetParam()); }
+
+ void TearDown() override { client_.close(); }
+
+ protected:
+ Client client_{lookupUrl};
+ ProducerConfiguration producerConf_;
+};
+
+TEST_P(ConsumerSeekTest, testSeekForMessageId) {
+ Client client(lookupUrl);
+ auto n = std::chrono::system_clock::now();
+ auto now =
std::chrono::duration_cast<std::chrono::nanoseconds>(n.time_since_epoch());
+
+ const std::string topic = "test-seek-for-message-id-" +
std::to_string(now.count());
Review Comment:
We can simply use `std::to_string(time(nullptr))` as the topic suffix like
other tests.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]