Repository: kafka
Updated Branches:
  refs/heads/trunk 21fea170d -> 71417552c


KAFKA-5342; Clarify producer fatal/abortable errors and fix inconsistencies

This patch improves documentation on the handling of errors for the 
idempotent/transactional producer. It also fixes a couple minor inconsistencies 
and improves test coverage. In particular:
- UnsupportedForMessageFormat should be a fatal error for TxnOffsetCommit 
responses
- UnsupportedVersion should be fatal for Produce responses and should be 
returned instead of InvalidRequest

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk>

Closes #3716 from hachikuji/KAFKA-5342


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71417552
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71417552
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71417552

Branch: refs/heads/trunk
Commit: 71417552c7651a5b41e73460d0577615864aafe8
Parents: 21fea17
Author: Jason Gustafson <ja...@confluent.io>
Authored: Thu Aug 24 16:03:55 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Thu Aug 24 16:03:55 2017 -0700

----------------------------------------------------------------------
 .../kafka/clients/producer/KafkaProducer.java   | 99 +++++++++++++++-----
 .../clients/producer/internals/Sender.java      |  5 +-
 .../producer/internals/TransactionManager.java  |  7 +-
 .../errors/OutOfOrderSequenceException.java     |  8 ++
 .../common/errors/ProducerFencedException.java  |  6 ++
 .../UnsupportedForMessageFormatException.java   |  3 +-
 .../errors/UnsupportedVersionException.java     | 12 +++
 .../org/apache/kafka/clients/MockClient.java    | 61 ++++++++----
 .../clients/producer/internals/SenderTest.java  | 72 +++++++++++++-
 .../internals/TransactionManagerTest.java       | 77 +++++++++++++++
 10 files changed, 299 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 176d8fe..ce2efba 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -515,34 +515,42 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * Needs to be called before any other methods when the transactional.id 
is set in the configuration.
      *
      * This method does the following:
-     *   1. Ensures any transactions initiated by previous instances of the 
producer
-     *      are completed. If the previous instance had failed with a 
transaction in
+     *   1. Ensures any transactions initiated by previous instances of the 
producer with the same
+     *      transactional.id are completed. If the previous instance had 
failed with a transaction in
      *      progress, it will be aborted. If the last transaction had begun 
completion,
      *      but not yet finished, this method awaits its completion.
      *   2. Gets the internal producer id and epoch, used in all future 
transactional
      *      messages issued by the producer.
      *
-     * @throws IllegalStateException if the TransactionalId for the producer 
is not set
-     *         in the configuration.
+     * @throws IllegalStateException if no transactional.id has been configured
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
+     *         transactional.id is not authorized
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      */
     public void initTransactions() {
-        if (transactionManager == null)
-            throw new IllegalStateException("Cannot call initTransactions 
without setting a transactional id.");
+        throwIfNoTransactionManager();
         TransactionalRequestResult result = 
transactionManager.initializeTransactions();
         sender.wakeup();
         result.await();
     }
 
     /**
-     * Should be called before the start of each new transaction.
+     * Should be called before the start of each new transaction. Note that 
prior to the first invocation
+     * of this method, you must invoke {@link #initTransactions()} exactly one 
time.
      *
-     * @throws ProducerFencedException if another producer is with the same
-     *         transactional.id is active.
+     * @throws IllegalStateException if no transactional.id has been 
configured or if {@link #initTransactions()}
+     *         has not yet been invoked
+     * @throws ProducerFencedException if another producer with the same 
transactional.id is active
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
+     *         transactional.id is not authorized
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      */
     public void beginTransaction() throws ProducerFencedException {
-        // Set the transactional bit in the producer.
-        if (transactionManager == null)
-            throw new IllegalStateException("Cannot use transactional methods 
without enabling transactions");
+        throwIfNoTransactionManager();
         transactionManager.beginTransaction();
     }
 
@@ -554,13 +562,20 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * This method should be used when you need to batch consumed and produced 
messages
      * together, typically in a consume-transform-produce pattern.
      *
-     * @throws ProducerFencedException if another producer with the same
-     *         transactional.id is active.
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws 
org.apache.kafka.common.errors.UnsupportedForMessageFormatException  fatal 
error indicating the message
+     *         format used for the offsets topic on the broker does not 
support transactions
+     * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
+     *         transactional.id is not authorized
+     * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
+     *         other unexpected error
      */
     public void sendOffsetsToTransaction(Map<TopicPartition, 
OffsetAndMetadata> offsets,
                                          String consumerGroupId) throws 
ProducerFencedException {
-        if (transactionManager == null)
-            throw new IllegalStateException("Cannot send offsets to 
transaction since transactions are not enabled.");
+        throwIfNoTransactionManager();
         TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId);
         sender.wakeup();
         result.await();
@@ -573,12 +588,17 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * errors, this method will throw the last received exception immediately 
and the transaction will not be committed.
      * So all {@link #send(ProducerRecord)} calls in a transaction must 
succeed in order for this method to succeed.
      *
-     * @throws ProducerFencedException if another producer with the same
-     *         transactional.id is active.
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
+     *         transactional.id is not authorized
+     * @throws KafkaException if the producer has encountered a previous fatal 
or abortable error, or for any
+     *         other unexpected error
      */
     public void commitTransaction() throws ProducerFencedException {
-        if (transactionManager == null)
-            throw new IllegalStateException("Cannot commit transaction since 
transactions are not enabled");
+        throwIfNoTransactionManager();
         TransactionalRequestResult result = transactionManager.beginCommit();
         sender.wakeup();
         result.await();
@@ -589,12 +609,16 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
      * This call will throw an exception immediately if any prior {@link 
#send(ProducerRecord)} calls failed with a
      * {@link ProducerFencedException} or an instance of {@link 
org.apache.kafka.common.errors.AuthorizationException}.
      *
-     * @throws ProducerFencedException if another producer with the same
-     *         transactional.id is active.
+     * @throws IllegalStateException if no transactional.id has been 
configured or no transaction has been started
+     * @throws ProducerFencedException fatal error indicating another producer 
with the same transactional.id is active
+     * @throws org.apache.kafka.common.errors.UnsupportedVersionException 
fatal error indicating the broker
+     *         does not support transactions (i.e. if its version is lower 
than 0.11.0.0)
+     * @throws org.apache.kafka.common.errors.AuthorizationException fatal 
error indicating that the configured
+     *         transactional.id is not authorized
+     * @throws KafkaException if the producer has encountered a previous fatal 
error or for any other unexpected error
      */
     public void abortTransaction() throws ProducerFencedException {
-        if (transactionManager == null)
-            throw new IllegalStateException("Cannot abort transaction since 
transactions are not enabled.");
+        throwIfNoTransactionManager();
         TransactionalRequestResult result = transactionManager.beginAbort();
         sender.wakeup();
         result.await();
@@ -676,7 +700,25 @@ public class KafkaProducer<K, V> implements Producer<K, V> 
{
      * <p>
      * Some transactional send errors cannot be resolved with a call to {@link 
#abortTransaction()}.  In particular,
      * if a transactional send finishes with a {@link 
ProducerFencedException}, a {@link 
org.apache.kafka.common.errors.OutOfOrderSequenceException},
-     * or any {@link org.apache.kafka.common.errors.AuthorizationException}, 
then the only option left is to call {@link #close()}.
+     * a {@link org.apache.kafka.common.errors.UnsupportedVersionException}, 
or an
+     * {@link org.apache.kafka.common.errors.AuthorizationException}, then the 
only option left is to call {@link #close()}.
+     * Fatal errors cause the producer to enter a defunct state in which 
future API calls will continue to raise
+     * the same underyling error wrapped in a new {@link KafkaException}.
+     * </p>
+     * <p>
+     * It is a similar picture when idempotence is enabled, but no 
<code>transactional.id</code> has been configured.
+     * In this case, {@link 
org.apache.kafka.common.errors.UnsupportedVersionException} and
+     * {@link org.apache.kafka.common.errors.AuthorizationException} are 
considered fatal errors. However,
+     * {@link ProducerFencedException} does not need to be handled. 
Additionally, it is possible to continue
+     * sending after receiving an {@link 
org.apache.kafka.common.errors.OutOfOrderSequenceException}, but doing so
+     * can result in out of order delivery of pending messages. To ensure 
proper ordering, you should close the
+     * producer and create a new instance.
+     * </p>
+     * <p>
+     * If the message format of the destination topic is not upgraded to 
0.11.0.0, idempotent and transactional
+     * produce requests will fail with an {@link 
org.apache.kafka.common.errors.UnsupportedForMessageFormatException}
+     * error. If this is encountered during a transaction, it is possible to 
abort and continue. But note that future
+     * sends to the same topic will continue receiving the same exception 
until the topic is upgraded.
      * </p>
      * <p>
      * Note that callbacks will generally execute in the I/O thread of the 
producer and so should be reasonably fast or
@@ -688,6 +730,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> {
      * @param callback A user-supplied callback to execute when the record has 
been acknowledged by the server (null
      *        indicates no callback)
      *
+     * @throws IllegalStateException if a transactional.id has been configured 
and no transaction has been started
      * @throws InterruptException If the thread is interrupted while blocked
      * @throws SerializationException If the key or value are not valid 
objects given the configured serializers
      * @throws TimeoutException If the time taken for fetching metadata or 
allocating memory for the record has surpassed <code>max.block.ms</code>.
@@ -1040,6 +1083,12 @@ public class KafkaProducer<K, V> implements Producer<K, 
V> {
                         record.topic(), record.key(), serializedKey, 
record.value(), serializedValue, cluster);
     }
 
+    private void throwIfNoTransactionManager() {
+        if (transactionManager == null)
+            throw new IllegalStateException("Cannot use transactional methods 
without enabling transactions " +
+                    "by setting the " + ProducerConfig.TRANSACTIONAL_ID_CONFIG 
+ " configuration property");
+    }
+
     private static class ClusterAndWaitTime {
         final Cluster cluster;
         final long waitedOnMetadataMs;

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 8519c4a..806cfdf 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -461,7 +461,7 @@ public class Sender implements Runnable {
             log.warn("Cancelled request {} due to a version mismatch with node 
{}",
                     response, response.destination(), 
response.versionMismatch());
             for (ProducerBatch batch : batches.values())
-                completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST), correlationId, now);
+                completeBatch(batch, new 
ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, 
now);
         } else {
             log.trace("Received produce response from node {} with correlation 
id {}", response.destination(), correlationId);
             // if we have a response, parse it
@@ -590,7 +590,8 @@ public class Sender implements Runnable {
                 transactionManager.resetProducerId();
             } else if (exception instanceof ClusterAuthorizationException
                     || exception instanceof 
TransactionalIdAuthorizationException
-                    || exception instanceof ProducerFencedException) {
+                    || exception instanceof ProducerFencedException
+                    || exception instanceof UnsupportedVersionException) {
                 transactionManager.transitionToFatalError(exception);
             } else if (transactionManager.isTransactional()) {
                 transactionManager.transitionToAbortableError(exception);

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
index c5542e4..fad0332 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java
@@ -1035,10 +1035,9 @@ public class TransactionManager {
                 } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                     abortableError(new 
GroupAuthorizationException(builder.consumerGroupId()));
                     return;
-                } else if (error == 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) {
-                    fatalError(error.exception());
-                    return;
-                } else if (error == Errors.INVALID_PRODUCER_EPOCH) {
+                } else if (error == 
Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED
+                        || error == Errors.INVALID_PRODUCER_EPOCH
+                        || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) {
                     fatalError(error.exception());
                     return;
                 } else {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
index 1c1cc6b..462e91e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java
@@ -16,6 +16,14 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This exception indicates that the broker received an unexpected sequence 
number from the producer,
+ * which means that data may have been lost. If the producer is configured for 
idempotence only (i.e.
+ * if <code>enable.idempotence</code> is set and no 
<code>transactional.id</code> is configured), it
+ * is possible to continue sending with the same producer instance, but doing 
so risks reordering
+ * of sent records. For transactional producers, this is a fatal error and you 
should close the
+ * producer.
+ */
 public class OutOfOrderSequenceException extends ApiException {
 
     public OutOfOrderSequenceException(String msg) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
index c699d8e..c47dbf5 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java
@@ -16,6 +16,12 @@
  */
 package org.apache.kafka.common.errors;
 
+/**
+ * This fatal exception indicates that another producer with the same 
<code>transactional.id</code> has been
+ * started. It is only possible to have one producer instance with a 
<code>transactional.id</code> at any
+ * given time, and the latest one to be started "fences" the previous 
instances so that they can no longer
+ * make transactional requests. When you encounter this exception, you must 
close the producer instance.
+ */
 public class ProducerFencedException extends ApiException {
 
     public ProducerFencedException(String msg) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java
index 7992d10..f66298e 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java
@@ -17,7 +17,8 @@
 package org.apache.kafka.common.errors;
 
 /**
- * The message format version does not support the requested function.
+ * The message format version does not support the requested function. For 
example, if idempotence is
+ * requested and the topic is using a message format older than 0.11.0.0, then 
this error will be returned.
  */
 public class UnsupportedForMessageFormatException extends ApiException {
     private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
 
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
index 17bd71e..484947b 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java
@@ -16,6 +16,18 @@
  */
 package org.apache.kafka.common.errors;
 
+import java.util.Map;
+
+/**
+ * Indicates that a request API or version needed by the client is not 
supported by the broker. This is
+ * typically a fatal error as Kafka clients will downgrade request versions as 
needed except in cases where
+ * a needed feature is not available in old versions. Fatal errors can 
generally only be handled by closing
+ * the client instance, although in some cases it may be possible to continue 
without relying on the
+ * underlying feature. For example, when the producer is used with idempotence 
enabled, this error is fatal
+ * since the producer does not support reverting to weaker semantics. On the 
other hand, if this error
+ * is raised from {@link 
org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(Map)}, it would
+ * be possible to revert to alternative logic to set the consumer's position.
+ */
 public class UnsupportedVersionException extends ApiException {
     private static final long serialVersionUID = 1L;
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/test/java/org/apache/kafka/clients/MockClient.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java 
b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
index 29fae94..9960cce 100644
--- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java
+++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java
@@ -18,6 +18,7 @@ package org.apache.kafka.clients;
 
 import org.apache.kafka.common.Cluster;
 import org.apache.kafka.common.Node;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.requests.AbstractRequest;
 import org.apache.kafka.common.requests.AbstractResponse;
 import org.apache.kafka.common.utils.Time;
@@ -48,16 +49,22 @@ public class MockClient implements KafkaClient {
     };
 
     private static class FutureResponse {
-        public final AbstractResponse responseBody;
-        public final boolean disconnected;
-        public final RequestMatcher requestMatcher;
-        public Node node;
-
-        public FutureResponse(AbstractResponse responseBody, boolean 
disconnected, RequestMatcher requestMatcher, Node node) {
+        private final Node node;
+        private final RequestMatcher requestMatcher;
+        private final AbstractResponse responseBody;
+        private final boolean disconnected;
+        private final boolean isUnsupportedRequest;
+
+        public FutureResponse(Node node,
+                              RequestMatcher requestMatcher,
+                              AbstractResponse responseBody,
+                              boolean disconnected,
+                              boolean isUnsupportedRequest) {
+            this.node = node;
+            this.requestMatcher = requestMatcher;
             this.responseBody = responseBody;
             this.disconnected = disconnected;
-            this.requestMatcher = requestMatcher;
-            this.node = node;
+            this.isUnsupportedRequest = isUnsupportedRequest;
         }
 
     }
@@ -156,8 +163,15 @@ public class MockClient implements KafkaClient {
             AbstractRequest abstractRequest = 
request.requestBuilder().build(version);
             if (!futureResp.requestMatcher.matches(abstractRequest))
                 throw new IllegalStateException("Request matcher did not match 
next-in-line request " + abstractRequest);
+
+            UnsupportedVersionException unsupportedVersionException = null;
+            if (futureResp.isUnsupportedRequest)
+                unsupportedVersionException = new 
UnsupportedVersionException("Api " +
+                        request.apiKey() + " with version " + version);
+
             ClientResponse resp = new 
ClientResponse(request.makeHeader(version), request.callback(), 
request.destination(),
-                    request.createdTimeMs(), time.milliseconds(), 
futureResp.disconnected, null, futureResp.responseBody);
+                    request.createdTimeMs(), time.milliseconds(), 
futureResp.disconnected,
+                    unsupportedVersionException, futureResp.responseBody);
             responses.add(resp);
             iterator.remove();
             return;
@@ -241,7 +255,7 @@ public class MockClient implements KafkaClient {
     }
 
     public void prepareResponseFrom(AbstractResponse response, Node node) {
-        prepareResponseFrom(ALWAYS_TRUE, response, node, false);
+        prepareResponseFrom(ALWAYS_TRUE, response, node, false, false);
     }
 
     /**
@@ -255,7 +269,7 @@ public class MockClient implements KafkaClient {
     }
 
     public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse 
response, Node node) {
-        prepareResponseFrom(matcher, response, node, false);
+        prepareResponseFrom(matcher, response, node, false, false);
     }
 
     public void prepareResponse(AbstractResponse response, boolean 
disconnected) {
@@ -263,22 +277,35 @@ public class MockClient implements KafkaClient {
     }
 
     public void prepareResponseFrom(AbstractResponse response, Node node, 
boolean disconnected) {
-        prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected);
+        prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected, false);
     }
 
     /**
      * Prepare a response for a request matching the provided matcher. If the 
matcher does not
-     * match, {@link KafkaClient#send(ClientRequest, long)} will throw 
IllegalStateException
-     * @param matcher The matcher to apply
+     * match, {@link KafkaClient#send(ClientRequest, long)} will throw 
IllegalStateException.
+     * @param matcher The request matcher to apply
      * @param response The response body
      * @param disconnected Whether the request was disconnected
      */
     public void prepareResponse(RequestMatcher matcher, AbstractResponse 
response, boolean disconnected) {
-        prepareResponseFrom(matcher, response, null, disconnected);
+        prepareResponseFrom(matcher, response, null, disconnected, false);
+    }
+
+    /**
+     * Raise an unsupported version error on the next request if it matches 
the given matcher.
+     * If the matcher does not match, {@link KafkaClient#send(ClientRequest, 
long)} will throw IllegalStateException.
+     * @param matcher The request matcher to apply
+     */
+    public void prepareUnsupportedVersionResponse(RequestMatcher matcher) {
+        prepareResponseFrom(matcher, null, null, false, true);
     }
 
-    public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse 
response, Node node, boolean disconnected) {
-        futureResponses.add(new FutureResponse(response, disconnected, 
matcher, node));
+    private void prepareResponseFrom(RequestMatcher matcher,
+                                     AbstractResponse response,
+                                     Node node,
+                                     boolean disconnected,
+                                     boolean isUnsupportedVersion) {
+        futureResponses.add(new FutureResponse(node, matcher, response, 
disconnected, isUnsupportedVersion));
     }
 
     public void waitForRequests(final int minRequests, long maxWaitMs) throws 
InterruptedException {

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
index d587de4..b6a09ae 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java
@@ -30,6 +30,8 @@ import org.apache.kafka.common.Node;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.ClusterAuthorizationException;
 import org.apache.kafka.common.errors.TimeoutException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -72,10 +74,10 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
-import static org.junit.Assert.assertNull;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -492,11 +494,77 @@ public class SenderTest {
             assertTrue(e.getCause() instanceof ClusterAuthorizationException);
         }
 
-        // cluster authorization is a fatal error for the producer
+        // cluster authorization errors are fatal, so we should continue 
seeing it on future sends
+        assertTrue(transactionManager.hasFatalError());
         assertSendFailure(ClusterAuthorizationException.class);
     }
 
     @Test
+    public void testUnsupportedForMessageFormatInProduceRequest() throws 
Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+
+        client.setNode(new Node(1, "localhost", 33343));
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        Future<RecordMetadata> future = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
+        client.prepareResponse(new MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof ProduceRequest && ((ProduceRequest) 
body).isIdempotent();
+            }
+        }, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0));
+
+        sender.run(time.milliseconds());
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have raised UnsupportedForMessageFormat");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof 
UnsupportedForMessageFormatException);
+        }
+
+        // unsupported for message format is not a fatal error
+        assertFalse(transactionManager.hasError());
+    }
+
+    @Test
+    public void testUnsupportedVersionInProduceRequest() throws Exception {
+        final long producerId = 343434L;
+        TransactionManager transactionManager = new TransactionManager();
+        setupWithTransactionState(transactionManager);
+
+        client.setNode(new Node(1, "localhost", 33343));
+        prepareAndReceiveInitProducerId(producerId, Errors.NONE);
+        assertTrue(transactionManager.hasProducerId());
+
+        Future<RecordMetadata> future = accumulator.append(tp0, 
time.milliseconds(), "key".getBytes(), "value".getBytes(),
+                null, null, MAX_BLOCK_TIMEOUT).future;
+        client.prepareUnsupportedVersionResponse(new 
MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                return body instanceof ProduceRequest && ((ProduceRequest) 
body).isIdempotent();
+            }
+        });
+
+        sender.run(time.milliseconds());
+        assertTrue(future.isDone());
+        try {
+            future.get();
+            fail("Future should have raised UnsupportedVersionException");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof UnsupportedVersionException);
+        }
+
+        // unsupported version errors are fatal, so we should continue seeing 
it on future sends
+        assertTrue(transactionManager.hasFatalError());
+        assertSendFailure(UnsupportedVersionException.class);
+    }
+
+    @Test
     public void testSequenceNumberIncrement() throws InterruptedException {
         final long producerId = 343434L;
         TransactionManager transactionManager = new TransactionManager();

http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
index 9bac895..4fbcd96 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java
@@ -31,6 +31,8 @@ import 
org.apache.kafka.common.errors.OutOfOrderSequenceException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.errors.TopicAuthorizationException;
 import org.apache.kafka.common.errors.TransactionalIdAuthorizationException;
+import org.apache.kafka.common.errors.UnsupportedForMessageFormatException;
+import org.apache.kafka.common.errors.UnsupportedVersionException;
 import org.apache.kafka.common.internals.ClusterResourceListeners;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.metrics.Metrics;
@@ -639,6 +641,81 @@ public class TransactionManagerTest {
     }
 
     @Test
+    public void testUnsupportedFindCoordinator() {
+        transactionManager.initializeTransactions();
+        client.prepareUnsupportedVersionResponse(new 
MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                FindCoordinatorRequest findCoordinatorRequest = 
(FindCoordinatorRequest) body;
+                assertEquals(findCoordinatorRequest.coordinatorType(), 
CoordinatorType.TRANSACTION);
+                assertEquals(findCoordinatorRequest.coordinatorKey(), 
transactionalId);
+                return true;
+            }
+        });
+
+        sender.run(time.milliseconds()); // InitProducerRequest is queued
+        sender.run(time.milliseconds()); // FindCoordinator is queued after 
peeking InitProducerRequest
+        assertTrue(transactionManager.hasFatalError());
+        assertTrue(transactionManager.lastError() instanceof 
UnsupportedVersionException);
+    }
+
+    @Test
+    public void testUnsupportedInitTransactions() {
+        transactionManager.initializeTransactions();
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.TRANSACTION, transactionalId);
+        sender.run(time.milliseconds()); // InitProducerRequest is queued
+        sender.run(time.milliseconds()); // FindCoordinator is queued after 
peeking InitProducerRequest
+
+        assertFalse(transactionManager.hasError());
+        
assertNotNull(transactionManager.coordinator(CoordinatorType.TRANSACTION));
+
+        client.prepareUnsupportedVersionResponse(new 
MockClient.RequestMatcher() {
+            @Override
+            public boolean matches(AbstractRequest body) {
+                InitProducerIdRequest initProducerIdRequest = 
(InitProducerIdRequest) body;
+                assertEquals(initProducerIdRequest.transactionalId(), 
transactionalId);
+                assertEquals(initProducerIdRequest.transactionTimeoutMs(), 
transactionTimeoutMs);
+                return true;
+            }
+        });
+
+        sender.run(time.milliseconds()); // InitProducerRequest is dequeued
+        assertTrue(transactionManager.hasFatalError());
+        assertTrue(transactionManager.lastError() instanceof 
UnsupportedVersionException);
+    }
+
+    @Test
+    public void testUnsupportedForMessageFormatInTxnOffsetCommit() {
+        final String consumerGroupId = "consumer";
+        final long pid = 13131L;
+        final short epoch = 1;
+        final TopicPartition tp = new TopicPartition("foo", 0);
+
+        doInitTransactions(pid, epoch);
+
+        transactionManager.beginTransaction();
+        TransactionalRequestResult sendOffsetsResult = 
transactionManager.sendOffsetsToTransaction(
+                singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId);
+
+        prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, 
epoch);
+        sender.run(time.milliseconds());  // AddOffsetsToTxn Handled, 
TxnOffsetCommit Enqueued
+        sender.run(time.milliseconds());  // FindCoordinator Enqueued
+
+        prepareFindCoordinatorResponse(Errors.NONE, false, 
CoordinatorType.GROUP, consumerGroupId);
+        sender.run(time.milliseconds());  // FindCoordinator Returned
+
+        prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, 
singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT));
+        sender.run(time.milliseconds());  // TxnOffsetCommit Handled
+
+        assertTrue(transactionManager.hasError());
+        assertTrue(transactionManager.lastError() instanceof 
UnsupportedForMessageFormatException);
+        assertTrue(sendOffsetsResult.isCompleted());
+        assertFalse(sendOffsetsResult.isSuccessful());
+        assertTrue(sendOffsetsResult.error() instanceof 
UnsupportedForMessageFormatException);
+        assertFatalError(UnsupportedForMessageFormatException.class);
+    }
+
+    @Test
     public void testLookupCoordinatorOnDisconnectAfterSend() {
         // This is called from the initTransactions method in the producer as 
the first order of business.
         // It finds the coordinator and then gets a PID.

Reply via email to