cadonna commented on code in PR #16031:
URL: https://github.com/apache/kafka/pull/16031#discussion_r1625726216
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -319,11 +326,11 @@ private void
autoCommitSyncBeforeRevocationWithRetries(OffsetCommitRequestState
if (error == null) {
result.complete(null);
} else {
- if (error instanceof RetriableException ||
isStaleEpochErrorAndValidEpochAvailable(error)) {
- if (error instanceof TimeoutException &&
requestAttempt.isExpired()) {
- log.debug("Auto-commit sync before revocation timed
out and won't be retried anymore");
- result.completeExceptionally(error);
- } else if (error instanceof
UnknownTopicOrPartitionException) {
+ if (requestAttempt.isExpired()) {
Review Comment:
Follow-up to
https://github.com/apache/kafka/pull/16031#discussion_r1624620726
I am not sure if this is good. Let's assume I get a non-retriable error AND
the timeout was exceeded. Do I really want to get a timeout exception wrapped
around the non-retriable error? I want a timeout exception when I can improve
the situation by increasing the timeout. If I get a non-retriable error, I
cannot improve the situation by increasing the timeout.
So, I would only throw a timeout exception when I get a retriable error.
WDYT?
This affects also the other location where the expiration is verified.
Either way, tests are required that test the combination of (non-)retriable
error and expiration.
##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerPollTest.scala:
##########
@@ -238,6 +238,20 @@ class PlaintextConsumerPollTest extends
AbstractConsumerTest {
runMultiConsumerSessionTimeoutTest(true)
}
+ // Ensure TestUtils polls with ZERO. This fails for the new consumer only.
Review Comment:
I do not understand this comment. Could you please elaborate?
If I look where the poll is done, a duration of 100 ms is used.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -465,39 +474,39 @@ private Throwable commitAsyncExceptionForError(Throwable
error) {
* Enqueue a request to fetch committed offsets, that will be sent on the
next call to {@link #poll(long)}.
*
* @param partitions Partitions to fetch offsets for.
- * @param expirationTimeMs Time until which the request should be retried
if it fails
+ * @param deadlineMs Time until which the request should be retried if it
fails
Review Comment:
nit: the indentation is not consistent
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -391,7 +398,7 @@ public CompletableFuture<Void> commitAsync(final
Map<TopicPartition, OffsetAndMe
* Commit offsets, retrying on expected retriable errors while the retry
timeout hasn't expired.
*
* @param offsets Offsets to commit
- * @param deadlineMs Time until which the request will be
retried if it fails with
+ * @param deadlineMs Time until which the request will be retried if
it fails with
Review Comment:
nit: The indentation is not consistent.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -469,7 +474,7 @@ private Throwable commitAsyncExceptionForError(Throwable
error) {
* Enqueue a request to fetch committed offsets, that will be sent on the
next call to {@link #poll(long)}.
*
* @param partitions Partitions to fetch offsets for.
- * @param deadlineMs Time until which the request should be retried
if it fails
Review Comment:
nit: revert
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer}
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
Review Comment:
Could you add unit tests for this class?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer}
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+ private final Timer timer;
+
+ public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+ super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+ this.timer = timer;
+ }
+
+ TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final int retryBackoffExpBase,
+ final long retryBackoffMaxMs,
+ final double jitter,
+ final Timer timer) {
+ super(logContext, owner, retryBackoffMs, retryBackoffExpBase,
retryBackoffMaxMs, jitter);
+ this.timer = timer;
+ }
+
+ public boolean isExpired() {
+ timer.update();
+ return timer.isExpired();
+ }
+
+ public long remainingMs() {
+ timer.update();
+ return timer.remainingMs();
+ }
+
+ public static Timer deadlineTimer(final Time time, final long deadlineMs) {
+ // Prevent the timer from being negative.
Review Comment:
I do not think that you need to explain this.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/TimedRequestState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.consumer.internals;
+
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Timer;
+
+/**
+ * {@code TimedRequestState} adds to a {@link RequestState} a {@link Timer}
with which to keep track
+ * of the request's expiration.
+ */
+public class TimedRequestState extends RequestState {
+
+ private final Timer timer;
+
+ public TimedRequestState(final LogContext logContext,
+ final String owner,
+ final long retryBackoffMs,
+ final long retryBackoffMaxMs,
+ final Timer timer) {
+ super(logContext, owner, retryBackoffMs, retryBackoffMaxMs);
+ this.timer = timer;
+ }
+
+ TimedRequestState(final LogContext logContext,
Review Comment:
Both of this constructors are only called in child constructors. Why is one
package-private and the other public?
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/CommitRequestManager.java:
##########
@@ -827,26 +808,15 @@ abstract class RetriableRequestState extends RequestState
{
*/
abstract CompletableFuture<?> future();
- /**
- * Complete the request future with a TimeoutException if the request
timeout has been
- * reached, based on the provided current time.
- */
- void maybeExpire(long currentTimeMs) {
- if (retryTimeoutExpired(currentTimeMs)) {
- removeRequest();
- isExpired = true;
- future().completeExceptionally(new
TimeoutException(requestDescription() +
- " could not complete before timeout expired."));
- }
- }
-
/**
* Build request with the given builder, including response handling
logic.
*/
NetworkClientDelegate.UnsentRequest
buildRequestWithResponseHandling(final AbstractRequest.Builder<?> builder) {
NetworkClientDelegate.UnsentRequest request = new
NetworkClientDelegate.UnsentRequest(
builder,
- coordinatorRequestManager.coordinator());
+ coordinatorRequestManager.coordinator(),
+ time.timer(requestTimeoutMs)
+ );
request.whenComplete(
(response, throwable) -> {
long currentTimeMs = request.handler().completionTimeMs();
Review Comment:
I will let you decide what to do. It just caught me.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]