[ https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Philip Nee updated KAFKA-15946: ------------------------------- Description: The original design was that the network thread always completes the future whether succeeds or fails. However, in the current patch, I mis-added auto-retry functionality because commitSync wasn't retrying. What we should be doing is, the commit sync API should catch the RetriableExceptions and resend another commit until timesout. {code:java} if (error.exception() instanceof RetriableException) { log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); handleRetriableError(error, response); retry(responseTime); <--- We probably shouldn't do this. return; } {code} {code:java} @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) { acquireAndEnsureOpen(); long commitStart = time.nanoseconds(); try { CompletableFuture<Void> commitFuture = commit(offsets, true); <-- we probably should retry here ConsumerUtils.getResult(commitFuture, time.timer(timeout)); } finally { wakeupTrigger.clearTask(); kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); release(); } } {code} was: The original design was that the network thread always completes the future whether succeeds or fails. However, in the current patch, I mis-added auto-retry functionality because commitSync wasn't retrying. What we should be doing is, the commit sync API should catch the RetriableExceptions and resend another commit until timesout. ``` if (error.exception() instanceof RetriableException) { log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, error.message()); handleRetriableError(error, response); retry(responseTime); <--- We probably shouldn't do this. return; } ``` and ``` @Override public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, Duration timeout) { acquireAndEnsureOpen(); long commitStart = time.nanoseconds(); try { CompletableFuture<Void> commitFuture = commit(offsets, true); <-- we probably should retry here ConsumerUtils.getResult(commitFuture, time.timer(timeout)); } finally { wakeupTrigger.clearTask(); kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); release(); } } ``` > AsyncKafkaConsumer should retry commits on the application thread instead of > autoretry > -------------------------------------------------------------------------------------- > > Key: KAFKA-15946 > URL: https://issues.apache.org/jira/browse/KAFKA-15946 > Project: Kafka > Issue Type: Bug > Components: consumer > Reporter: Philip Nee > Priority: Major > > The original design was that the network thread always completes the future > whether succeeds or fails. However, in the current patch, I mis-added > auto-retry functionality because commitSync wasn't retrying. What we should > be doing is, the commit sync API should catch the RetriableExceptions and > resend another commit until timesout. > > {code:java} > if (error.exception() instanceof RetriableException) { > log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, > error.message()); > handleRetriableError(error, response); > retry(responseTime); <--- We probably shouldn't do this. > return; > } {code} > > {code:java} > @Override > public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, > Duration timeout) { > acquireAndEnsureOpen(); > long commitStart = time.nanoseconds(); > try > { CompletableFuture<Void> commitFuture = commit(offsets, true); <-- we > probably should retry here ConsumerUtils.getResult(commitFuture, > time.timer(timeout)); } > finally > { wakeupTrigger.clearTask(); > kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); > release(); } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)