[ https://issues.apache.org/jira/browse/KAFKA-15946?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17791498#comment-17791498 ]
Lucas Brutschy commented on KAFKA-15946: ---------------------------------------- Could we alternatively just pass the timeout to the background thread and stop retrying if the timeout expired? > AsyncKafkaConsumer should retry commits on the application thread instead of > autoretry > -------------------------------------------------------------------------------------- > > Key: KAFKA-15946 > URL: https://issues.apache.org/jira/browse/KAFKA-15946 > Project: Kafka > Issue Type: Sub-task > Components: consumer > Reporter: Philip Nee > Assignee: Kirk True > Priority: Major > Labels: kip-848-client-support > > The original design was that the network thread always completes the future > whether succeeds or fails. However, in the current patch, I mis-added > auto-retry functionality because commitSync wasn't retrying. What we should > be doing is, the commit sync API should catch the RetriableExceptions and > resend another commit until timesout. > > {code:java} > if (error.exception() instanceof RetriableException) { > log.warn("OffsetCommit failed on partition {} at offset {}: {}", tp, offset, > error.message()); > handleRetriableError(error, response); > retry(responseTime); <--- We probably shouldn't do this. > return; > } {code} > > {code:java} > @Override > public void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets, > Duration timeout) { > acquireAndEnsureOpen(); > long commitStart = time.nanoseconds(); > try > { CompletableFuture<Void> commitFuture = commit(offsets, true); <-- we > probably should retry here ConsumerUtils.getResult(commitFuture, > time.timer(timeout)); } > finally > { wakeupTrigger.clearTask(); > kafkaConsumerMetrics.recordCommitSync(time.nanoseconds() - commitStart); > release(); } > } {code} -- This message was sent by Atlassian Jira (v8.20.10#820010)