Repository: kafka Updated Branches: refs/heads/trunk c8c6ab632 -> 690575ec4
MINOR: Follow-up improvements on top of KAFKA-5793 Simplified the condition in Sender#failBatch() Added log in TransactionManager#updateLastAckedOffset() Author: tedyu <yuzhih...@gmail.com> Reviewers: Jason Gustafson <ja...@confluent.io> Closes #3935 from tedyu/trunk Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/690575ec Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/690575ec Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/690575ec Branch: refs/heads/trunk Commit: 690575ec466b5ef6099c3a731e97d45b28c98b33 Parents: c8c6ab6 Author: tedyu <yuzhih...@gmail.com> Authored: Thu Sep 21 13:57:54 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Sep 21 13:57:54 2017 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/clients/producer/internals/Sender.java | 3 +-- .../kafka/clients/producer/internals/TransactionManager.java | 7 +++++-- 2 files changed, 6 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/690575ec/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index b96ea92..d71046a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -36,7 +36,6 @@ import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; -import org.apache.kafka.common.errors.UnknownProducerIdException; import org.apache.kafka.common.errors.UnknownTopicOrPartitionException; import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.metrics.Measurable; @@ -600,7 +599,7 @@ public class Sender implements Runnable { private void failBatch(ProducerBatch batch, long baseOffset, long logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) { if (transactionManager != null) { - if ((exception instanceof OutOfOrderSequenceException || exception instanceof UnknownProducerIdException) + if (exception instanceof OutOfOrderSequenceException && !transactionManager.isTransactional() && transactionManager.hasProducerId(batch.producerId())) { log.error("The broker returned {} for topic-partition " + http://git-wip-us.apache.org/repos/asf/kafka/blob/690575ec/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index 3bddded..a0b45cd 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -465,7 +465,7 @@ public class TransactionManager { synchronized long lastAckedOffset(TopicPartition topicPartition) { Long offset = lastAckedOffset.get(topicPartition); if (offset == null) - return -1; + return ProduceResponse.INVALID_OFFSET; return offset; } @@ -473,8 +473,11 @@ public class TransactionManager { if (response.baseOffset == ProduceResponse.INVALID_OFFSET) return; long lastOffset = response.baseOffset + batch.recordCount - 1; - if (lastOffset > lastAckedOffset(batch.topicPartition)) + if (lastOffset > lastAckedOffset(batch.topicPartition)) { lastAckedOffset.put(batch.topicPartition, lastOffset); + } else { + log.trace("Partition {} keeps lastOffset at {}", batch.topicPartition, lastOffset); + } } // If a batch is failed fatally, the sequence numbers for future batches bound for the partition must be adjusted