Repository: kafka Updated Branches: refs/heads/trunk 21fea170d -> 71417552c
KAFKA-5342; Clarify producer fatal/abortable errors and fix inconsistencies This patch improves documentation on the handling of errors for the idempotent/transactional producer. It also fixes a couple minor inconsistencies and improves test coverage. In particular: - UnsupportedForMessageFormat should be a fatal error for TxnOffsetCommit responses - UnsupportedVersion should be fatal for Produce responses and should be returned instead of InvalidRequest Author: Jason Gustafson <ja...@confluent.io> Reviewers: Apurva Mehta <apu...@confluent.io>, Ismael Juma <ism...@juma.me.uk> Closes #3716 from hachikuji/KAFKA-5342 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71417552 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71417552 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71417552 Branch: refs/heads/trunk Commit: 71417552c7651a5b41e73460d0577615864aafe8 Parents: 21fea17 Author: Jason Gustafson <ja...@confluent.io> Authored: Thu Aug 24 16:03:55 2017 -0700 Committer: Jason Gustafson <ja...@confluent.io> Committed: Thu Aug 24 16:03:55 2017 -0700 ---------------------------------------------------------------------- .../kafka/clients/producer/KafkaProducer.java | 99 +++++++++++++++----- .../clients/producer/internals/Sender.java | 5 +- .../producer/internals/TransactionManager.java | 7 +- .../errors/OutOfOrderSequenceException.java | 8 ++ .../common/errors/ProducerFencedException.java | 6 ++ .../UnsupportedForMessageFormatException.java | 3 +- .../errors/UnsupportedVersionException.java | 12 +++ .../org/apache/kafka/clients/MockClient.java | 61 ++++++++---- .../clients/producer/internals/SenderTest.java | 72 +++++++++++++- .../internals/TransactionManagerTest.java | 77 +++++++++++++++ 10 files changed, 299 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 176d8fe..ce2efba 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -515,34 +515,42 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * Needs to be called before any other methods when the transactional.id is set in the configuration. * * This method does the following: - * 1. Ensures any transactions initiated by previous instances of the producer - * are completed. If the previous instance had failed with a transaction in + * 1. Ensures any transactions initiated by previous instances of the producer with the same + * transactional.id are completed. If the previous instance had failed with a transaction in * progress, it will be aborted. If the last transaction had begun completion, * but not yet finished, this method awaits its completion. * 2. Gets the internal producer id and epoch, used in all future transactional * messages issued by the producer. * - * @throws IllegalStateException if the TransactionalId for the producer is not set - * in the configuration. + * @throws IllegalStateException if no transactional.id has been configured + * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void initTransactions() { - if (transactionManager == null) - throw new IllegalStateException("Cannot call initTransactions without setting a transactional id."); + throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.initializeTransactions(); sender.wakeup(); result.await(); } /** - * Should be called before the start of each new transaction. + * Should be called before the start of each new transaction. Note that prior to the first invocation + * of this method, you must invoke {@link #initTransactions()} exactly one time. * - * @throws ProducerFencedException if another producer is with the same - * transactional.id is active. + * @throws IllegalStateException if no transactional.id has been configured or if {@link #initTransactions()} + * has not yet been invoked + * @throws ProducerFencedException if another producer with the same transactional.id is active + * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void beginTransaction() throws ProducerFencedException { - // Set the transactional bit in the producer. - if (transactionManager == null) - throw new IllegalStateException("Cannot use transactional methods without enabling transactions"); + throwIfNoTransactionManager(); transactionManager.beginTransaction(); } @@ -554,13 +562,20 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * This method should be used when you need to batch consumed and produced messages * together, typically in a consume-transform-produce pattern. * - * @throws ProducerFencedException if another producer with the same - * transactional.id is active. + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.UnsupportedForMessageFormatException fatal error indicating the message + * format used for the offsets topic on the broker does not support transactions + * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized + * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any + * other unexpected error */ public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets, String consumerGroupId) throws ProducerFencedException { - if (transactionManager == null) - throw new IllegalStateException("Cannot send offsets to transaction since transactions are not enabled."); + throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, consumerGroupId); sender.wakeup(); result.await(); @@ -573,12 +588,17 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * errors, this method will throw the last received exception immediately and the transaction will not be committed. * So all {@link #send(ProducerRecord)} calls in a transaction must succeed in order for this method to succeed. * - * @throws ProducerFencedException if another producer with the same - * transactional.id is active. + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized + * @throws KafkaException if the producer has encountered a previous fatal or abortable error, or for any + * other unexpected error */ public void commitTransaction() throws ProducerFencedException { - if (transactionManager == null) - throw new IllegalStateException("Cannot commit transaction since transactions are not enabled"); + throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.beginCommit(); sender.wakeup(); result.await(); @@ -589,12 +609,16 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * This call will throw an exception immediately if any prior {@link #send(ProducerRecord)} calls failed with a * {@link ProducerFencedException} or an instance of {@link org.apache.kafka.common.errors.AuthorizationException}. * - * @throws ProducerFencedException if another producer with the same - * transactional.id is active. + * @throws IllegalStateException if no transactional.id has been configured or no transaction has been started + * @throws ProducerFencedException fatal error indicating another producer with the same transactional.id is active + * @throws org.apache.kafka.common.errors.UnsupportedVersionException fatal error indicating the broker + * does not support transactions (i.e. if its version is lower than 0.11.0.0) + * @throws org.apache.kafka.common.errors.AuthorizationException fatal error indicating that the configured + * transactional.id is not authorized + * @throws KafkaException if the producer has encountered a previous fatal error or for any other unexpected error */ public void abortTransaction() throws ProducerFencedException { - if (transactionManager == null) - throw new IllegalStateException("Cannot abort transaction since transactions are not enabled."); + throwIfNoTransactionManager(); TransactionalRequestResult result = transactionManager.beginAbort(); sender.wakeup(); result.await(); @@ -676,7 +700,25 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * <p> * Some transactional send errors cannot be resolved with a call to {@link #abortTransaction()}. In particular, * if a transactional send finishes with a {@link ProducerFencedException}, a {@link org.apache.kafka.common.errors.OutOfOrderSequenceException}, - * or any {@link org.apache.kafka.common.errors.AuthorizationException}, then the only option left is to call {@link #close()}. + * a {@link org.apache.kafka.common.errors.UnsupportedVersionException}, or an + * {@link org.apache.kafka.common.errors.AuthorizationException}, then the only option left is to call {@link #close()}. + * Fatal errors cause the producer to enter a defunct state in which future API calls will continue to raise + * the same underyling error wrapped in a new {@link KafkaException}. + * </p> + * <p> + * It is a similar picture when idempotence is enabled, but no <code>transactional.id</code> has been configured. + * In this case, {@link org.apache.kafka.common.errors.UnsupportedVersionException} and + * {@link org.apache.kafka.common.errors.AuthorizationException} are considered fatal errors. However, + * {@link ProducerFencedException} does not need to be handled. Additionally, it is possible to continue + * sending after receiving an {@link org.apache.kafka.common.errors.OutOfOrderSequenceException}, but doing so + * can result in out of order delivery of pending messages. To ensure proper ordering, you should close the + * producer and create a new instance. + * </p> + * <p> + * If the message format of the destination topic is not upgraded to 0.11.0.0, idempotent and transactional + * produce requests will fail with an {@link org.apache.kafka.common.errors.UnsupportedForMessageFormatException} + * error. If this is encountered during a transaction, it is possible to abort and continue. But note that future + * sends to the same topic will continue receiving the same exception until the topic is upgraded. * </p> * <p> * Note that callbacks will generally execute in the I/O thread of the producer and so should be reasonably fast or @@ -688,6 +730,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { * @param callback A user-supplied callback to execute when the record has been acknowledged by the server (null * indicates no callback) * + * @throws IllegalStateException if a transactional.id has been configured and no transaction has been started * @throws InterruptException If the thread is interrupted while blocked * @throws SerializationException If the key or value are not valid objects given the configured serializers * @throws TimeoutException If the time taken for fetching metadata or allocating memory for the record has surpassed <code>max.block.ms</code>. @@ -1040,6 +1083,12 @@ public class KafkaProducer<K, V> implements Producer<K, V> { record.topic(), record.key(), serializedKey, record.value(), serializedValue, cluster); } + private void throwIfNoTransactionManager() { + if (transactionManager == null) + throw new IllegalStateException("Cannot use transactional methods without enabling transactions " + + "by setting the " + ProducerConfig.TRANSACTIONAL_ID_CONFIG + " configuration property"); + } + private static class ClusterAndWaitTime { final Cluster cluster; final long waitedOnMetadataMs; http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 8519c4a..806cfdf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -461,7 +461,7 @@ public class Sender implements Runnable { log.warn("Cancelled request {} due to a version mismatch with node {}", response, response.destination(), response.versionMismatch()); for (ProducerBatch batch : batches.values()) - completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.INVALID_REQUEST), correlationId, now); + completeBatch(batch, new ProduceResponse.PartitionResponse(Errors.UNSUPPORTED_VERSION), correlationId, now); } else { log.trace("Received produce response from node {} with correlation id {}", response.destination(), correlationId); // if we have a response, parse it @@ -590,7 +590,8 @@ public class Sender implements Runnable { transactionManager.resetProducerId(); } else if (exception instanceof ClusterAuthorizationException || exception instanceof TransactionalIdAuthorizationException - || exception instanceof ProducerFencedException) { + || exception instanceof ProducerFencedException + || exception instanceof UnsupportedVersionException) { transactionManager.transitionToFatalError(exception); } else if (transactionManager.isTransactional()) { transactionManager.transitionToAbortableError(exception); http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java index c5542e4..fad0332 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/TransactionManager.java @@ -1035,10 +1035,9 @@ public class TransactionManager { } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) { abortableError(new GroupAuthorizationException(builder.consumerGroupId())); return; - } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED) { - fatalError(error.exception()); - return; - } else if (error == Errors.INVALID_PRODUCER_EPOCH) { + } else if (error == Errors.TRANSACTIONAL_ID_AUTHORIZATION_FAILED + || error == Errors.INVALID_PRODUCER_EPOCH + || error == Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT) { fatalError(error.exception()); return; } else { http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java index 1c1cc6b..462e91e 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/OutOfOrderSequenceException.java @@ -16,6 +16,14 @@ */ package org.apache.kafka.common.errors; +/** + * This exception indicates that the broker received an unexpected sequence number from the producer, + * which means that data may have been lost. If the producer is configured for idempotence only (i.e. + * if <code>enable.idempotence</code> is set and no <code>transactional.id</code> is configured), it + * is possible to continue sending with the same producer instance, but doing so risks reordering + * of sent records. For transactional producers, this is a fatal error and you should close the + * producer. + */ public class OutOfOrderSequenceException extends ApiException { public OutOfOrderSequenceException(String msg) { http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java index c699d8e..c47dbf5 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/ProducerFencedException.java @@ -16,6 +16,12 @@ */ package org.apache.kafka.common.errors; +/** + * This fatal exception indicates that another producer with the same <code>transactional.id</code> has been + * started. It is only possible to have one producer instance with a <code>transactional.id</code> at any + * given time, and the latest one to be started "fences" the previous instances so that they can no longer + * make transactional requests. When you encounter this exception, you must close the producer instance. + */ public class ProducerFencedException extends ApiException { public ProducerFencedException(String msg) { http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java index 7992d10..f66298e 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedForMessageFormatException.java @@ -17,7 +17,8 @@ package org.apache.kafka.common.errors; /** - * The message format version does not support the requested function. + * The message format version does not support the requested function. For example, if idempotence is + * requested and the topic is using a message format older than 0.11.0.0, then this error will be returned. */ public class UnsupportedForMessageFormatException extends ApiException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java index 17bd71e..484947b 100644 --- a/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java +++ b/clients/src/main/java/org/apache/kafka/common/errors/UnsupportedVersionException.java @@ -16,6 +16,18 @@ */ package org.apache.kafka.common.errors; +import java.util.Map; + +/** + * Indicates that a request API or version needed by the client is not supported by the broker. This is + * typically a fatal error as Kafka clients will downgrade request versions as needed except in cases where + * a needed feature is not available in old versions. Fatal errors can generally only be handled by closing + * the client instance, although in some cases it may be possible to continue without relying on the + * underlying feature. For example, when the producer is used with idempotence enabled, this error is fatal + * since the producer does not support reverting to weaker semantics. On the other hand, if this error + * is raised from {@link org.apache.kafka.clients.consumer.KafkaConsumer#offsetsForTimes(Map)}, it would + * be possible to revert to alternative logic to set the consumer's position. + */ public class UnsupportedVersionException extends ApiException { private static final long serialVersionUID = 1L; http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/test/java/org/apache/kafka/clients/MockClient.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 29fae94..9960cce 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -18,6 +18,7 @@ package org.apache.kafka.clients; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.AbstractResponse; import org.apache.kafka.common.utils.Time; @@ -48,16 +49,22 @@ public class MockClient implements KafkaClient { }; private static class FutureResponse { - public final AbstractResponse responseBody; - public final boolean disconnected; - public final RequestMatcher requestMatcher; - public Node node; - - public FutureResponse(AbstractResponse responseBody, boolean disconnected, RequestMatcher requestMatcher, Node node) { + private final Node node; + private final RequestMatcher requestMatcher; + private final AbstractResponse responseBody; + private final boolean disconnected; + private final boolean isUnsupportedRequest; + + public FutureResponse(Node node, + RequestMatcher requestMatcher, + AbstractResponse responseBody, + boolean disconnected, + boolean isUnsupportedRequest) { + this.node = node; + this.requestMatcher = requestMatcher; this.responseBody = responseBody; this.disconnected = disconnected; - this.requestMatcher = requestMatcher; - this.node = node; + this.isUnsupportedRequest = isUnsupportedRequest; } } @@ -156,8 +163,15 @@ public class MockClient implements KafkaClient { AbstractRequest abstractRequest = request.requestBuilder().build(version); if (!futureResp.requestMatcher.matches(abstractRequest)) throw new IllegalStateException("Request matcher did not match next-in-line request " + abstractRequest); + + UnsupportedVersionException unsupportedVersionException = null; + if (futureResp.isUnsupportedRequest) + unsupportedVersionException = new UnsupportedVersionException("Api " + + request.apiKey() + " with version " + version); + ClientResponse resp = new ClientResponse(request.makeHeader(version), request.callback(), request.destination(), - request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, null, futureResp.responseBody); + request.createdTimeMs(), time.milliseconds(), futureResp.disconnected, + unsupportedVersionException, futureResp.responseBody); responses.add(resp); iterator.remove(); return; @@ -241,7 +255,7 @@ public class MockClient implements KafkaClient { } public void prepareResponseFrom(AbstractResponse response, Node node) { - prepareResponseFrom(ALWAYS_TRUE, response, node, false); + prepareResponseFrom(ALWAYS_TRUE, response, node, false, false); } /** @@ -255,7 +269,7 @@ public class MockClient implements KafkaClient { } public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node) { - prepareResponseFrom(matcher, response, node, false); + prepareResponseFrom(matcher, response, node, false, false); } public void prepareResponse(AbstractResponse response, boolean disconnected) { @@ -263,22 +277,35 @@ public class MockClient implements KafkaClient { } public void prepareResponseFrom(AbstractResponse response, Node node, boolean disconnected) { - prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected); + prepareResponseFrom(ALWAYS_TRUE, response, node, disconnected, false); } /** * Prepare a response for a request matching the provided matcher. If the matcher does not - * match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException - * @param matcher The matcher to apply + * match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException. + * @param matcher The request matcher to apply * @param response The response body * @param disconnected Whether the request was disconnected */ public void prepareResponse(RequestMatcher matcher, AbstractResponse response, boolean disconnected) { - prepareResponseFrom(matcher, response, null, disconnected); + prepareResponseFrom(matcher, response, null, disconnected, false); + } + + /** + * Raise an unsupported version error on the next request if it matches the given matcher. + * If the matcher does not match, {@link KafkaClient#send(ClientRequest, long)} will throw IllegalStateException. + * @param matcher The request matcher to apply + */ + public void prepareUnsupportedVersionResponse(RequestMatcher matcher) { + prepareResponseFrom(matcher, null, null, false, true); } - public void prepareResponseFrom(RequestMatcher matcher, AbstractResponse response, Node node, boolean disconnected) { - futureResponses.add(new FutureResponse(response, disconnected, matcher, node)); + private void prepareResponseFrom(RequestMatcher matcher, + AbstractResponse response, + Node node, + boolean disconnected, + boolean isUnsupportedVersion) { + futureResponses.add(new FutureResponse(node, matcher, response, disconnected, isUnsupportedVersion)); } public void waitForRequests(final int minRequests, long maxWaitMs) throws InterruptedException { http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java index d587de4..b6a09ae 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/SenderTest.java @@ -30,6 +30,8 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.ClusterAuthorizationException; import org.apache.kafka.common.errors.TimeoutException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricConfig; @@ -72,10 +74,10 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -492,11 +494,77 @@ public class SenderTest { assertTrue(e.getCause() instanceof ClusterAuthorizationException); } - // cluster authorization is a fatal error for the producer + // cluster authorization errors are fatal, so we should continue seeing it on future sends + assertTrue(transactionManager.hasFatalError()); assertSendFailure(ClusterAuthorizationException.class); } @Test + public void testUnsupportedForMessageFormatInProduceRequest() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + + client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); + } + }, produceResponse(tp0, -1, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, 0)); + + sender.run(time.milliseconds()); + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have raised UnsupportedForMessageFormat"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof UnsupportedForMessageFormatException); + } + + // unsupported for message format is not a fatal error + assertFalse(transactionManager.hasError()); + } + + @Test + public void testUnsupportedVersionInProduceRequest() throws Exception { + final long producerId = 343434L; + TransactionManager transactionManager = new TransactionManager(); + setupWithTransactionState(transactionManager); + + client.setNode(new Node(1, "localhost", 33343)); + prepareAndReceiveInitProducerId(producerId, Errors.NONE); + assertTrue(transactionManager.hasProducerId()); + + Future<RecordMetadata> future = accumulator.append(tp0, time.milliseconds(), "key".getBytes(), "value".getBytes(), + null, null, MAX_BLOCK_TIMEOUT).future; + client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + return body instanceof ProduceRequest && ((ProduceRequest) body).isIdempotent(); + } + }); + + sender.run(time.milliseconds()); + assertTrue(future.isDone()); + try { + future.get(); + fail("Future should have raised UnsupportedVersionException"); + } catch (ExecutionException e) { + assertTrue(e.getCause() instanceof UnsupportedVersionException); + } + + // unsupported version errors are fatal, so we should continue seeing it on future sends + assertTrue(transactionManager.hasFatalError()); + assertSendFailure(UnsupportedVersionException.class); + } + + @Test public void testSequenceNumberIncrement() throws InterruptedException { final long producerId = 343434L; TransactionManager transactionManager = new TransactionManager(); http://git-wip-us.apache.org/repos/asf/kafka/blob/71417552/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java index 9bac895..4fbcd96 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/TransactionManagerTest.java @@ -31,6 +31,8 @@ import org.apache.kafka.common.errors.OutOfOrderSequenceException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.TopicAuthorizationException; import org.apache.kafka.common.errors.TransactionalIdAuthorizationException; +import org.apache.kafka.common.errors.UnsupportedForMessageFormatException; +import org.apache.kafka.common.errors.UnsupportedVersionException; import org.apache.kafka.common.internals.ClusterResourceListeners; import org.apache.kafka.common.metrics.MetricConfig; import org.apache.kafka.common.metrics.Metrics; @@ -639,6 +641,81 @@ public class TransactionManagerTest { } @Test + public void testUnsupportedFindCoordinator() { + transactionManager.initializeTransactions(); + client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + FindCoordinatorRequest findCoordinatorRequest = (FindCoordinatorRequest) body; + assertEquals(findCoordinatorRequest.coordinatorType(), CoordinatorType.TRANSACTION); + assertEquals(findCoordinatorRequest.coordinatorKey(), transactionalId); + return true; + } + }); + + sender.run(time.milliseconds()); // InitProducerRequest is queued + sender.run(time.milliseconds()); // FindCoordinator is queued after peeking InitProducerRequest + assertTrue(transactionManager.hasFatalError()); + assertTrue(transactionManager.lastError() instanceof UnsupportedVersionException); + } + + @Test + public void testUnsupportedInitTransactions() { + transactionManager.initializeTransactions(); + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.TRANSACTION, transactionalId); + sender.run(time.milliseconds()); // InitProducerRequest is queued + sender.run(time.milliseconds()); // FindCoordinator is queued after peeking InitProducerRequest + + assertFalse(transactionManager.hasError()); + assertNotNull(transactionManager.coordinator(CoordinatorType.TRANSACTION)); + + client.prepareUnsupportedVersionResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(AbstractRequest body) { + InitProducerIdRequest initProducerIdRequest = (InitProducerIdRequest) body; + assertEquals(initProducerIdRequest.transactionalId(), transactionalId); + assertEquals(initProducerIdRequest.transactionTimeoutMs(), transactionTimeoutMs); + return true; + } + }); + + sender.run(time.milliseconds()); // InitProducerRequest is dequeued + assertTrue(transactionManager.hasFatalError()); + assertTrue(transactionManager.lastError() instanceof UnsupportedVersionException); + } + + @Test + public void testUnsupportedForMessageFormatInTxnOffsetCommit() { + final String consumerGroupId = "consumer"; + final long pid = 13131L; + final short epoch = 1; + final TopicPartition tp = new TopicPartition("foo", 0); + + doInitTransactions(pid, epoch); + + transactionManager.beginTransaction(); + TransactionalRequestResult sendOffsetsResult = transactionManager.sendOffsetsToTransaction( + singletonMap(tp, new OffsetAndMetadata(39L)), consumerGroupId); + + prepareAddOffsetsToTxnResponse(Errors.NONE, consumerGroupId, pid, epoch); + sender.run(time.milliseconds()); // AddOffsetsToTxn Handled, TxnOffsetCommit Enqueued + sender.run(time.milliseconds()); // FindCoordinator Enqueued + + prepareFindCoordinatorResponse(Errors.NONE, false, CoordinatorType.GROUP, consumerGroupId); + sender.run(time.milliseconds()); // FindCoordinator Returned + + prepareTxnOffsetCommitResponse(consumerGroupId, pid, epoch, singletonMap(tp, Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT)); + sender.run(time.milliseconds()); // TxnOffsetCommit Handled + + assertTrue(transactionManager.hasError()); + assertTrue(transactionManager.lastError() instanceof UnsupportedForMessageFormatException); + assertTrue(sendOffsetsResult.isCompleted()); + assertFalse(sendOffsetsResult.isSuccessful()); + assertTrue(sendOffsetsResult.error() instanceof UnsupportedForMessageFormatException); + assertFatalError(UnsupportedForMessageFormatException.class); + } + + @Test public void testLookupCoordinatorOnDisconnectAfterSend() { // This is called from the initTransactions method in the producer as the first order of business. // It finds the coordinator and then gets a PID.