BewareMyPower commented on code in PR #549:
URL: https://github.com/apache/pulsar-client-cpp/pull/549#discussion_r2940086368


##########
lib/ConsumerImpl.cc:
##########
@@ -1819,7 +1819,13 @@ void ConsumerImpl::seekAsyncInternal(long requestId, 
const SharedBuffer& seek, c
             if (result == ResultOk) {
                 LockGuard lock(mutex_);
                 if (getCnx().expired() || reconnectionPending_) {
-                    // It's during reconnection, complete the seek future 
after connection is established
+                    // It's during reconnection, complete the seek future 
after connection is established.
+                    // Clear local state now so hasMessageAvailable() does not 
see stale prefetched messages.
+                    ackGroupingTrackerPtr_->flushAndClean();
+                    incomingMessages_.clear();
+                    if (lastSeekArg_.has_value() && 
std::holds_alternative<MessageId>(lastSeekArg_.value())) {
+                        startMessageId_ = 
std::get<MessageId>(lastSeekArg_.value());
+                    }

Review Comment:
   It makes the same changes in `clearReceiveQueue`. As you can see the seek 
callback is only executed after that, so when `hasMessageAvailable` is called 
after `seek`, the state is guaranteed to be cleared.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to