Repository: kafka
Updated Branches:
  refs/heads/trunk c8c6ab632 -> 690575ec4


MINOR: Follow-up improvements on top of KAFKA-5793

Simplified the condition in Sender#failBatch()
Added log in TransactionManager#updateLastAckedOffset()

Author: tedyu <yuzhih...@gmail.com>

Reviewers: Jason Gustafson <ja...@confluent.io>

Closes #3935 from tedyu/trunk


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/690575ec
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/690575ec
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/690575ec

Branch: refs/heads/trunk
Commit: 690575ec466b5ef6099c3a731e97d45b28c98b33
Parents: c8c6ab6
Author: tedyu <yuzhih...@gmail.com>
Authored: Thu Sep 21 13:57:54 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Sep 21 13:57:54 2017 -0700

----------------------------------------------------------------------
 .../org/apache/kafka/clients/producer/internals/Sender.java   | 3 +--
 .../kafka/clients/producer/internals/TransactionManager.java  | 7 +++++--
 2 files changed, 6 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/690575ec/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b96ea92..d71046a 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -36,7 +36,6 @@ import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.RetriableException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
-import org.apache.kafka.common.errors.UnknownProducerIdException;
 import org.apache.kafka.common.errors.UnknownTopicOrPartitionException;
 import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.metrics.Measurable;
@@ -600,7 +599,7 @@ public class Sender implements Runnable {
 
     private void failBatch(ProducerBatch batch, long baseOffset, long 
logAppendTime, RuntimeException exception, boolean adjustSequenceNumbers) {
         if (transactionManager != null) {
-            if ((exception instanceof OutOfOrderSequenceException || exception 
instanceof UnknownProducerIdException)
+            if (exception instanceof OutOfOrderSequenceException
                     && !transactionManager.isTransactional()
                     && transactionManager.hasProducerId(batch.producerId())) {
                 log.error("The broker returned {} for topic-partition " +

http://git-wip-us.apache.org/repos/asf/kafka/blob/690575ec/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index 3bddded..a0b45cd 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -465,7 +465,7 @@ public class TransactionManager {
     synchronized long lastAckedOffset(TopicPartition topicPartition) {
         Long offset = lastAckedOffset.get(topicPartition);
         if (offset == null)
-            return -1;
+            return ProduceResponse.INVALID_OFFSET;
         return offset;
     }
 
@@ -473,8 +473,11 @@ public class TransactionManager {
         if (response.baseOffset == ProduceResponse.INVALID_OFFSET)
             return;
         long lastOffset = response.baseOffset + batch.recordCount - 1;
-        if (lastOffset > lastAckedOffset(batch.topicPartition))
+        if (lastOffset > lastAckedOffset(batch.topicPartition)) {
             lastAckedOffset.put(batch.topicPartition, lastOffset);
+        } else {
+            log.trace("Partition {} keeps lastOffset at {}", 
batch.topicPartition, lastOffset);
+        }
     }
 
     // If a batch is failed fatally, the sequence numbers for future batches 
bound for the partition must be adjusted

Reply via email to