Repository: kafka
Updated Branches:
  refs/heads/0.11.0 886a41d62 -> ad74035d5


KAFKA-5428; Transactional producer should only abort batches in fatal error 
state

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>

Closes #3298 from hachikuji/KAFKA-5428

(cherry picked from commit aea53108a7d9573b8976da83235639a981c09af3)
Signed-off-by: Jason Gustafson <ja...@confluent.io>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ad74035d
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ad74035d
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ad74035d

Branch: refs/heads/0.11.0
Commit: ad74035d5870e203433d40c31ec1c9b3ae2710df
Parents: 886a41d
Author: Jason Gustafson <ja...@confluent.io>
Authored: Mon Jun 12 18:07:27 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Mon Jun 12 18:09:45 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   |   3 -
 .../producer/internals/ProducerBatch.java       |  12 +-
 .../producer/internals/ProducerIdAndEpoch.java  |   2 +-
 .../producer/internals/RecordAccumulator.java   |   2 +-
 .../clients/producer/internals/Sender.java      |   9 +-
 .../producer/internals/TransactionManager.java  |  22 +-
 .../internals/TransactionManagerTest.java       | 240 ++++++++++++++++++-
 .../main/scala/kafka/cluster/Partition.scala    |  24 +-
 ...nsactionMarkerRequestCompletionHandler.scala |   3 +-
 .../scala/kafka/tools/DumpLogSegments.scala     |   3 +-
 .../kafka/api/AuthorizerIntegrationTest.scala   |  10 +-
 11 files changed, 288 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index a7336d8..4f155a4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -607,9 +607,6 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * Implementation of asynchronously send a record to a topic.
      */
     private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, 
Callback callback) {
-        if (transactionManager != null)
-            transactionManager.failIfNotReadyForSend();
-
         TopicPartition tp = null;
         try {
             // first make sure the metadata for the topic is available

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
index b33e080..9d8b82d 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerBatch.java
@@ -163,9 +163,15 @@ public final class ProducerBatch {
      * @param exception The exception that occurred (or null if the request 
was successful)
      */
     public void done(long baseOffset, long logAppendTime, RuntimeException 
exception) {
-        log.trace("Produced messages to topic-partition {} with base offset 
offset {} and error: {}.",
-                  topicPartition, baseOffset, exception);
-        FinalState finalState = exception != null ? FinalState.FAILED : 
FinalState.SUCCEEDED;
+        final FinalState finalState;
+        if (exception == null) {
+            log.trace("Successfully produced messages to {} with base offset 
{}.", topicPartition, baseOffset);
+            finalState = FinalState.SUCCEEDED;
+        } else {
+            log.trace("Failed to produce messages to {}.", topicPartition, 
exception);
+            finalState = FinalState.FAILED;
+        }
+
         if (!this.finalState.compareAndSet(null, finalState)) {
             if (this.finalState.get() == FinalState.ABORTED) {
                 log.debug("ProduceResponse returned for {} after batch had 
already been aborted.", topicPartition);

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
index 293bb51..ebd5cc3 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/ProducerIdAndEpoch.java
@@ -36,6 +36,6 @@ class ProducerIdAndEpoch {
 
     @Override
     public String toString() {
-        return "(producerId=" + producerId + ", epoch='" + epoch + ")";
+        return "(producerId=" + producerId + ", epoch=" + epoch + ")";
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
index 76cc6bd..505417c 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/RecordAccumulator.java
@@ -622,7 +622,7 @@ public final class RecordAccumulator {
         }
     }
 
-    void abortUnclosedBatches(RuntimeException reason) {
+    void abortOpenBatches(RuntimeException reason) {
         for (ProducerBatch batch : incomplete.all()) {
             Deque<ProducerBatch> dq = getDeque(batch.topicPartition);
             boolean aborted = false;

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index b8d4ab9..c9a1292 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -209,12 +209,14 @@ public class Sender implements Runnable {
 
             // do not continue sending if the transaction manager is in a 
failed state or if there
             // is no producer id (for the idempotent case).
-            if (transactionManager.hasError() || 
!transactionManager.hasProducerId()) {
+            if (transactionManager.hasFatalError() || 
!transactionManager.hasProducerId()) {
                 RuntimeException lastError = transactionManager.lastError();
                 if (lastError != null)
                     maybeAbortBatches(lastError);
                 client.poll(retryBackoffMs, now);
                 return;
+            } else if (transactionManager.hasAbortableError()) {
+                accumulator.abortOpenBatches(transactionManager.lastError());
             }
         }
 
@@ -260,7 +262,6 @@ public class Sender implements Runnable {
         }
 
         List<ProducerBatch> expiredBatches = 
this.accumulator.expiredBatches(this.requestTimeout, now);
-
         boolean needsTransactionStateReset = false;
         // Reset the producer id if an expired batch has previously been sent 
to the broker. Also update the metrics
         // for expired batches. see the documentation of 
@TransactionState.resetProducerId to understand why
@@ -298,7 +299,6 @@ public class Sender implements Runnable {
         sendProduceRequests(batches, now);
 
         return pollTimeout;
-
     }
 
     private boolean maybeSendTransactionalRequest(long now) {
@@ -308,7 +308,7 @@ public class Sender implements Runnable {
 
             // If the transaction is being aborted, then we can clear any 
unsent produce requests
             if (transactionManager.isAborting())
-                accumulator.abortUnclosedBatches(new KafkaException("Failing 
batch since transaction was aborted"));
+                accumulator.abortOpenBatches(new KafkaException("Failing batch 
since transaction was aborted"));
 
             // There may still be requests left which are being retried. Since 
we do not know whether they had
             // been successfully appended to the broker log, we must resend 
them until their final status is clear.
@@ -553,7 +553,6 @@ public class Sender implements Runnable {
 
         } else {
             completeBatch(batch, response);
-
         }
 
         // Unmute the completed partition.

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index a26c3b7..49738b2 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -218,8 +218,7 @@ public class TransactionManager {
     }
 
     public synchronized void maybeAddPartitionToTransaction(TopicPartition 
topicPartition) {
-        if (currentState != State.IN_TRANSACTION)
-            throw new IllegalStateException("Cannot add partitions to a 
transaction in state " + currentState);
+        failIfNotReadyForSend();
 
         if (isPartitionAdded(topicPartition) || 
isPartitionPendingAdd(topicPartition))
             return;
@@ -540,6 +539,8 @@ public class TransactionManager {
         transitionTo(State.READY);
         lastError = null;
         transactionStarted = false;
+        newPartitionsInTransaction.clear();
+        pendingPartitionsInTransaction.clear();
         partitionsInTransaction.clear();
     }
 
@@ -757,15 +758,22 @@ public class TransactionManager {
                 }
             }
 
+            Set<TopicPartition> partitions = errors.keySet();
+
+            // Remove the partitions from the pending set regardless of the 
result. We use the presence
+            // of partitions in the pending set to know when it is not safe to 
send batches. However, if
+            // the partitions failed to be added and we enter an error state, 
we expect the batches to be
+            // aborted anyway. In this case, we must be able to continue 
sending the batches which are in
+            // retry for partitions that were successfully added.
+            pendingPartitionsInTransaction.removeAll(partitions);
+
             if (!unauthorizedTopics.isEmpty()) {
                 abortableError(new 
TopicAuthorizationException(unauthorizedTopics));
             } else if (hasPartitionErrors) {
-                abortableError(new KafkaException("Could not add partitions to 
transaction due to partition level errors"));
+                abortableError(new KafkaException("Could not add partitions to 
transaction due to errors: " + errors));
             } else {
-                Set<TopicPartition> addedPartitions = errors.keySet();
-                log.debug("{}Successfully added partitions {} to transaction", 
logPrefix, addedPartitions);
-                partitionsInTransaction.addAll(addedPartitions);
-                pendingPartitionsInTransaction.removeAll(addedPartitions);
+                log.debug("{}Successfully added partitions {} to transaction", 
logPrefix, partitions);
+                partitionsInTransaction.addAll(partitions);
                 transactionStarted = true;
                 result.done();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index c4abd3c..bdad483 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -316,7 +316,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = KafkaException.class)
     public void testMaybeAddPartitionToTransactionAfterAbortableError() {
         long pid = 13131L;
         short epoch = 1;
@@ -326,7 +326,7 @@ public class TransactionManagerTest {
         transactionManager.maybeAddPartitionToTransaction(new 
TopicPartition("foo", 0));
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = KafkaException.class)
     public void testMaybeAddPartitionToTransactionAfterFatalError() {
         long pid = 13131L;
         short epoch = 1;
@@ -836,6 +836,11 @@ public class TransactionManagerTest {
 
         assertTrue(transactionManager.hasError());
         assertTrue(transactionManager.lastError() instanceof 
TopicAuthorizationException);
+        assertFalse(transactionManager.isPartitionPendingAdd(tp0));
+        assertFalse(transactionManager.isPartitionPendingAdd(tp1));
+        assertFalse(transactionManager.isPartitionAdded(tp0));
+        assertFalse(transactionManager.isPartitionAdded(tp1));
+        assertFalse(transactionManager.hasPartitionsToAdd());
 
         TopicAuthorizationException exception = (TopicAuthorizationException) 
transactionManager.lastError();
         assertEquals(singleton(tp0.topic()), exception.unauthorizedTopics());
@@ -844,6 +849,198 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testRecoveryFromAbortableErrorTransactionNotStarted() throws 
Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition unauthorizedPartition = new TopicPartition("foo", 
0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
+
+        Future<RecordMetadata> responseFuture = 
accumulator.append(unauthorizedPartition, time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED));
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.hasAbortableError());
+        transactionManager.beginAbortingTransaction();
+        sender.run(time.milliseconds());
+        assertTrue(responseFuture.isDone());
+        assertFutureFailed(responseFuture);
+
+        // No partitions added, so no need to prepare EndTxn response
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isReady());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertFalse(accumulator.hasUnflushedBatches());
+
+        // ensure we can now start a new transaction
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        responseFuture = accumulator.append(tp0, time.milliseconds(), 
"key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+        assertFalse(transactionManager.hasPartitionsToAdd());
+
+        transactionManager.beginCommittingTransaction();
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());
+
+        assertTrue(responseFuture.isDone());
+        assertNotNull(responseFuture.get());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, 
epoch);
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.isReady());
+    }
+
+    @Test
+    public void testRecoveryFromAbortableErrorTransactionStarted() throws 
Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition unauthorizedPartition = new TopicPartition("foo", 
0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+
+        Future<RecordMetadata> authorizedTopicProduceFuture = 
accumulator.append(unauthorizedPartition, time.milliseconds(),
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, 
null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+
+        
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
+        Future<RecordMetadata> unauthorizedTopicProduceFuture = 
accumulator.append(unauthorizedPartition, time.milliseconds(),
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, 
null, MAX_BLOCK_TIMEOUT).future;
+        prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED));
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.hasAbortableError());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+        
assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
+        assertFalse(authorizedTopicProduceFuture.isDone());
+        assertFalse(unauthorizedTopicProduceFuture.isDone());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, 
epoch);
+        transactionManager.beginAbortingTransaction();
+        sender.run(time.milliseconds());
+        // neither produce request has been sent, so they should both be 
failed immediately
+        assertFutureFailed(authorizedTopicProduceFuture);
+        assertFutureFailed(unauthorizedTopicProduceFuture);
+        assertTrue(transactionManager.isReady());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertFalse(accumulator.hasUnflushedBatches());
+
+        // ensure we can now start a new transaction
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+        assertFalse(transactionManager.hasPartitionsToAdd());
+
+        transactionManager.beginCommittingTransaction();
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());
+
+        assertTrue(nextTransactionFuture.isDone());
+        assertNotNull(nextTransactionFuture.get());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, 
epoch);
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.isReady());
+    }
+
+    @Test
+    public void testRecoveryFromAbortableErrorProduceRequestInRetry() throws 
Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition unauthorizedPartition = new TopicPartition("foo", 
0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+        prepareAddPartitionsToTxn(tp0, Errors.NONE);
+
+        Future<RecordMetadata> authorizedTopicProduceFuture = 
accumulator.append(tp0, time.milliseconds(),
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, 
null, MAX_BLOCK_TIMEOUT).future;
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+
+        accumulator.beginFlush();
+        prepareProduceResponse(Errors.REQUEST_TIMED_OUT, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFalse(authorizedTopicProduceFuture.isDone());
+        assertTrue(accumulator.hasUnflushedBatches());
+
+        
transactionManager.maybeAddPartitionToTransaction(unauthorizedPartition);
+        Future<RecordMetadata> unauthorizedTopicProduceFuture = 
accumulator.append(unauthorizedPartition, time.milliseconds(),
+                "key".getBytes(), "value".getBytes(), Record.EMPTY_HEADERS, 
null, MAX_BLOCK_TIMEOUT).future;
+        prepareAddPartitionsToTxn(singletonMap(unauthorizedPartition, 
Errors.TOPIC_AUTHORIZATION_FAILED));
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.hasAbortableError());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+        
assertFalse(transactionManager.isPartitionAdded(unauthorizedPartition));
+        assertFalse(authorizedTopicProduceFuture.isDone());
+
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());
+        assertFutureFailed(unauthorizedTopicProduceFuture);
+        assertTrue(authorizedTopicProduceFuture.isDone());
+        assertNotNull(authorizedTopicProduceFuture.get());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.ABORT, pid, 
epoch);
+        transactionManager.beginAbortingTransaction();
+        sender.run(time.milliseconds());
+        // neither produce request has been sent, so they should both be 
failed immediately
+        assertTrue(transactionManager.isReady());
+        assertFalse(transactionManager.hasPartitionsToAdd());
+        assertFalse(accumulator.hasUnflushedBatches());
+
+        // ensure we can now start a new transaction
+
+        transactionManager.beginTransaction();
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        FutureRecordMetadata nextTransactionFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxn(singletonMap(tp0, Errors.NONE));
+        sender.run(time.milliseconds());
+        assertTrue(transactionManager.isPartitionAdded(tp0));
+        assertFalse(transactionManager.hasPartitionsToAdd());
+
+        transactionManager.beginCommittingTransaction();
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+        sender.run(time.milliseconds());
+
+        assertTrue(nextTransactionFuture.isDone());
+        assertNotNull(nextTransactionFuture.get());
+
+        prepareEndTxnResponse(Errors.NONE, TransactionResult.COMMIT, pid, 
epoch);
+        sender.run(time.milliseconds());
+
+        assertTrue(transactionManager.isReady());
+    }
+
+    @Test
     public void testTransactionalIdAuthorizationFailureInAddPartitions() {
         final long pid = 13131L;
         final short epoch = 1;
@@ -1480,6 +1677,33 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void resendFailedProduceRequestAfterAbortableError() throws 
Exception {
+        final long pid = 13131L;
+        final short epoch = 1;
+        doInitTransactions(pid, epoch);
+        transactionManager.beginTransaction();
+
+        transactionManager.maybeAddPartitionToTransaction(tp0);
+
+        Future<RecordMetadata> responseFuture = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(),
+                "value".getBytes(), Record.EMPTY_HEADERS, null, 
MAX_BLOCK_TIMEOUT).future;
+
+        prepareAddPartitionsToTxnResponse(Errors.NONE, tp0, epoch, pid);
+        prepareProduceResponse(Errors.NOT_LEADER_FOR_PARTITION, pid, epoch);
+        sender.run(time.milliseconds()); // AddPartitions
+        sender.run(time.milliseconds()); // Produce
+
+        assertFalse(responseFuture.isDone());
+
+        transactionManager.transitionToAbortableError(new KafkaException());
+        prepareProduceResponse(Errors.NONE, pid, epoch);
+
+        sender.run(time.milliseconds());
+        assertTrue(responseFuture.isDone());
+        assertNotNull(responseFuture.get());
+    }
+
+    @Test
     public void testTransitionToAbortableErrorOnBatchExpiry() throws 
InterruptedException, ExecutionException {
         final long pid = 13131L;
         final short epoch = 1;
@@ -1866,4 +2090,16 @@ public class TransactionManagerTest {
             assertTrue(transactionManager.hasError());
         }
     }
+
+    private void assertFutureFailed(Future<RecordMetadata> future) throws 
InterruptedException {
+        assertTrue(future.isDone());
+
+        try {
+            future.get();
+            fail("Expected produce future to throw");
+        } catch (ExecutionException e) {
+            // expected
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala 
b/core/src/main/scala/kafka/cluster/Partition.scala
index aa11ba1..32b6865 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -268,8 +268,8 @@ class Partition(val topic: String,
         if (leaderLWIncremented || leaderHWIncremented)
           tryCompleteDelayedRequests()
 
-        debug("Recorded replica %d log end offset (LEO) position %d for 
partition %s."
-          .format(replicaId, 
logReadResult.info.fetchOffsetMetadata.messageOffset, topicPartition))
+        debug("Recorded replica %d log end offset (LEO) position %d."
+          .format(replicaId, 
logReadResult.info.fetchOffsetMetadata.messageOffset))
       case None =>
         throw new NotAssignedReplicaException(("Leader %d failed to record 
follower %d's position %d since the replica" +
           " is not recognized to be one of the assigned replicas %s for 
partition %s.")
@@ -302,7 +302,7 @@ class Partition(val topic: String,
              assignedReplicas.map(_.brokerId).contains(replicaId) &&
              replica.logEndOffset.offsetDiff(leaderHW) >= 0) {
             val newInSyncReplicas = inSyncReplicas + replica
-            info(s"Expanding ISR for partition $topicPartition from 
${inSyncReplicas.map(_.brokerId).mkString(",")} " +
+            info(s"Expanding ISR from 
${inSyncReplicas.map(_.brokerId).mkString(",")} " +
               s"to ${newInSyncReplicas.map(_.brokerId).mkString(",")}")
             // update ISR in ZK and cache
             updateIsr(newInSyncReplicas)
@@ -333,7 +333,7 @@ class Partition(val topic: String,
         def numAcks = curInSyncReplicas.count { r =>
           if (!r.isLocal)
             if (r.logEndOffset.messageOffset >= requiredOffset) {
-              trace(s"Replica ${r.brokerId} of ${topic}-${partitionId} 
received offset $requiredOffset")
+              trace(s"Replica ${r.brokerId} received offset $requiredOffset")
               true
             }
             else
@@ -342,7 +342,7 @@ class Partition(val topic: String,
             true /* also count the local (leader) replica */
         }
 
-        trace(s"$numAcks acks satisfied for ${topic}-${partitionId} with acks 
= -1")
+        trace(s"$numAcks acks satisfied with acks = -1")
 
         val minIsr = leaderReplica.log.get.config.minInSyncReplicas
 
@@ -388,11 +388,11 @@ class Partition(val topic: String,
     val oldHighWatermark = leaderReplica.highWatermark
     if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset || 
oldHighWatermark.onOlderSegment(newHighWatermark)) {
       leaderReplica.highWatermark = newHighWatermark
-      debug("High watermark for partition [%s,%d] updated to %s".format(topic, 
partitionId, newHighWatermark))
+      debug(s"High watermark updated to $newHighWatermark")
       true
-    } else {
-      debug("Skipping update high watermark since Old hw %s is larger than new 
hw %s for partition [%s,%d]. All leo's are %s"
-        .format(oldHighWatermark, newHighWatermark, topic, partitionId, 
allLogEndOffsets.mkString(",")))
+    } else  {
+      debug(s"Skipping update high watermark since new hw $newHighWatermark is 
not larger than old hw $oldHighWatermark." +
+        s"All LEOs are ${allLogEndOffsets.mkString(",")}")
       false
     }
   }
@@ -427,8 +427,8 @@ class Partition(val topic: String,
           if(outOfSyncReplicas.nonEmpty) {
             val newInSyncReplicas = inSyncReplicas -- outOfSyncReplicas
             assert(newInSyncReplicas.nonEmpty)
-            info("Shrinking ISR for partition [%s,%d] from %s to 
%s".format(topic, partitionId,
-              inSyncReplicas.map(_.brokerId).mkString(","), 
newInSyncReplicas.map(_.brokerId).mkString(",")))
+            info("Shrinking ISR from %s to 
%s".format(inSyncReplicas.map(_.brokerId).mkString(","),
+              newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in zk and in cache
             updateIsr(newInSyncReplicas)
             // we may need to increment high watermark since ISR could be down 
to 1
@@ -464,7 +464,7 @@ class Partition(val topic: String,
 
     val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - 
r.lastCaughtUpTimeMs) > maxLagMs)
     if (laggingReplicas.nonEmpty)
-      debug("Lagging replicas for partition %s are %s".format(topicPartition, 
laggingReplicas.map(_.brokerId).mkString(",")))
+      debug("Lagging replicas are 
%s".format(laggingReplicas.map(_.brokerId).mkString(",")))
 
     laggingReplicas
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
index c34ea72..4abaada 100644
--- 
a/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
+++ 
b/core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandler.scala
@@ -87,7 +87,7 @@ class TransactionMarkerRequestCompletionHandler(brokerId: Int,
         }
       }
     } else {
-      trace(s"Received response $response from node ${response.destination} 
with correlation id $correlationId")
+      debug(s"Received WriteTxnMarker response $response from node 
${response.destination} with correlation id $correlationId")
 
       val writeTxnMarkerResponse = 
response.responseBody.asInstanceOf[WriteTxnMarkersResponse]
 
@@ -135,7 +135,6 @@ class TransactionMarkerRequestCompletionHandler(brokerId: 
Int,
                 for ((topicPartition: TopicPartition, error: Errors) <- 
errors) {
                   error match {
                     case Errors.NONE =>
-
                       txnMetadata.removePartition(topicPartition)
 
                     case Errors.CORRUPT_MESSAGE |

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/core/src/main/scala/kafka/tools/DumpLogSegments.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 3680d10..9179fec 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -283,7 +283,8 @@ object DumpLogSegments {
         s"producerEpoch:${txnMetadata.producerEpoch}," +
         s"state=${txnMetadata.state}," +
         s"partitions=${txnMetadata.topicPartitions}," +
-        s"lastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}"
+        s"txnLastUpdateTimestamp=${txnMetadata.txnLastUpdateTimestamp}," +
+        s"txnTimeoutMs=${txnMetadata.txnTimeoutMs}"
 
       (Some(keyString), Some(valueString))
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ad74035d/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala 
b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 22699a9..bd04c7b 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -926,10 +926,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     try {
       // the second time, the call to send itself should fail (the producer 
becomes unusable
       // if no producerId can be obtained)
-      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes))
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes)).get()
       fail("Should have raised ClusterAuthorizationException")
     } catch {
-      case e: KafkaException =>
+      case e: ExecutionException =>
         assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
     }
   }
@@ -959,10 +959,10 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     try {
       // the second time, the call to send itself should fail (the producer 
becomes unusable
       // if no producerId can be obtained)
-      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes))
+      producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic, 
"hi".getBytes)).get()
       fail("Should have raised ClusterAuthorizationException")
     } catch {
-      case e: KafkaException =>
+      case e: ExecutionException =>
         assertTrue(e.getCause.isInstanceOf[ClusterAuthorizationException])
     }
   }
@@ -1061,7 +1061,7 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
     try {
       producer.send(new ProducerRecord(deleteTopic, 0, "1".getBytes, 
"1".getBytes)).get
     } catch {
-      case e : ExecutionException =>
+      case e: ExecutionException =>
         assertTrue(e.getCause.isInstanceOf[TopicAuthorizationException])
     }
     // now rollback

Reply via email to