kirktrue commented on code in PR #14557:
URL: https://github.com/apache/kafka/pull/14557#discussion_r1406678397
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -161,16 +172,16 @@ public NetworkClientDelegate.PollResult poll(final long
currentTimeMs) {
List requests =
pendingRequests.drain(currentTimeMs);
// min of the remainingBackoffMs of all the request that are still
backing off
final long timeUntilNextPoll = Math.min(
-findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
-findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
+findMinTime(unsentOffsetCommitRequests(), currentTimeMs),
+findMinTime(unsentOffsetFetchRequests(), currentTimeMs));
Review Comment:
Nit: we can leave the indentation as is.
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/MemberStateListener.java:
##
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.clients.consumer.internals;
+
+/**
+ * Listener to register for getting notified when the member state changes, or
new member ID or
+ * epoch are received.
+ */
+public interface MemberStateListener {
+
+/**
+ * Called when the member transitions to a new state.
+ *
+ * @param state New state.
+ */
+void onStateChange(MemberState state);
+
+/**
+ * Called when the member receives a new member ID.
+ *
+ * @param memberId New member ID.
+ * @param epochLatest member epoch received.
+ */
+void onMemberIdUpdated(String memberId, int epoch);
+
+/**
+ * Called when a member receives a new member epoch.
+ *
+ * @param epochNew member epoch.
+ * @param memberId Current member ID.
+ */
+void onMemberEpochUpdated(int epoch, String memberId);
Review Comment:
I'm wondering why this interface can't be a single `onUpdate(MemberState
state)` and leave it up to the callbacks to determine what's changed? 樂
##
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##
@@ -467,66 +594,153 @@ private void handleFatalError(final Errors error) {
future.completeExceptionally(new CommitFailedException());
break;
case UNKNOWN_MEMBER_ID:
-log.info("OffsetCommit failed due to unknown member id
memberId {}: {}", null, error.message());
-future.completeExceptionally(error.exception());
+log.info("OffsetCommit failed due to unknown member id:
{}", error.message());
+handleUnknownMemberIdError(this);
+break;
+case STALE_MEMBER_EPOCH:
+log.info("OffsetCommit failed due to stale member epoch:
{}", error.message());
+handleStaleMemberEpochError(this);
break;
default:
-future.completeExceptionally(new
KafkaException("Unexpected error in commit: " + error.message()));
+future.completeExceptionally(new
KafkaException("Unexpected error in commit:" +
+" " + error.message()));
break;
}
}
+
+@Override
+void abortRetry(String cause) {
+future.completeExceptionally(new KafkaException("Offset commit
waiting for new member" +
+" ID or epoch cannot be retried. " + cause));
+}
+
+/**
+ * Reset timers and add request to the list of pending requests, to
make sure it is sent
+ * out on the next poll iteration, without applying any backoff.
+ */
+@Override
+public void retryOnMemberIdOrEpochUpdate(Optional memberId,
+ Optional
memberEpoch) {
+this.memberId = memberId;
+this.memberEpoch = memberEpoch;
+reset();
+pendingRequests.addOffsetCommitRequest(this);
+}
+
+@Override
+public String requestName()