[ https://issues.apache.org/jira/browse/KAFKA-7284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16581707#comment-16581707 ]
ASF GitHub Bot commented on KAFKA-7284: --------------------------------------- guozhangwang closed pull request #5513: KAFKA-7284: streams should unwrap fenced exception URL: https://github.com/apache/kafka/pull/5513 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java index d2a84c66abe..a72714bb3df 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java @@ -21,6 +21,7 @@ import org.apache.kafka.clients.producer.internals.FutureRecordMetadata; import org.apache.kafka.clients.producer.internals.ProduceRequestResult; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Metric; import org.apache.kafka.common.MetricName; import org.apache.kafka.common.PartitionInfo; @@ -205,7 +206,7 @@ public void abortTransaction() throws ProducerFencedException { this.transactionInFlight = false; } - private void verifyProducerState() { + private synchronized void verifyProducerState() { if (this.closed) { throw new IllegalStateException("MockProducer is already closed."); } @@ -243,7 +244,12 @@ private void verifyNoTransactionInFlight() { */ @Override public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> record, Callback callback) { - verifyProducerState(); + if (this.closed) { + throw new IllegalStateException("MockProducer is already closed."); + } + if (this.producerFenced) { + throw new KafkaException("MockProducer is fenced.", new ProducerFencedException("Fenced")); + } int partition = 0; if (!this.cluster.partitionsForTopic(record.topic()).isEmpty()) partition = partition(record, this.cluster); @@ -313,7 +319,7 @@ public boolean closed() { return this.closed; } - public void fenceProducer() { + public synchronized void fenceProducer() { verifyProducerState(); verifyTransactionsInitialized(); this.producerFenced = true; diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java index 27fac280afc..ee4803f635b 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java @@ -19,6 +19,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata; import org.apache.kafka.clients.producer.internals.DefaultPartitioner; import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; @@ -267,18 +268,9 @@ public void shouldThrowOnSendIfProducerGotFenced() { try { producer.send(null); fail("Should have thrown as producer is fenced off"); - } catch (ProducerFencedException e) { } - } - - @Test - public void shouldThrowOnFlushIfProducerGotFenced() { - buildMockProducer(true); - producer.initTransactions(); - producer.fenceProducer(); - try { - producer.flush(); - fail("Should have thrown as producer is fenced off"); - } catch (ProducerFencedException e) { } + } catch (KafkaException e) { + assertTrue("The root cause of the exception should be ProducerFenced", e.getCause() instanceof ProducerFencedException); + } } @Test diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java index afdadf2dec3..44b2089f6ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java @@ -23,14 +23,14 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; -import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.AuthenticationException; +import org.apache.kafka.common.errors.AuthorizationException; import org.apache.kafka.common.errors.InvalidTopicException; import org.apache.kafka.common.errors.OffsetMetadataTooLarge; import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.errors.RetriableException; -import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.SecurityDisabledException; +import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.errors.TimeoutException; import org.apache.kafka.common.errors.UnknownServerException; import org.apache.kafka.common.serialization.Serializer; @@ -193,16 +193,26 @@ public void onCompletion(final RecordMetadata metadata, "You can increase producer parameter `max.block.ms` to increase this timeout.", topic); throw new StreamsException(String.format("%sFailed to send record to topic %s due to timeout.", logPrefix, topic)); } catch (final Exception uncaughtException) { - throw new StreamsException( - String.format(EXCEPTION_MESSAGE, - logPrefix, - "an error caught", - key, - value, - timestamp, - topic, - uncaughtException.getMessage()), - uncaughtException); + if (uncaughtException instanceof KafkaException && + uncaughtException.getCause() instanceof ProducerFencedException) { + final KafkaException kafkaException = (KafkaException) uncaughtException; + // producer.send() call may throw a KafkaException which wraps a FencedException, + // in this case we should throw its wrapped inner cause so that it can be captured and re-wrapped as TaskMigrationException + throw (ProducerFencedException) kafkaException.getCause(); + } else { + throw new StreamsException( + String.format( + EXCEPTION_MESSAGE, + logPrefix, + "an error caught", + key, + value, + timestamp, + topic, + uncaughtException.getMessage() + ), + uncaughtException); + } } } diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java index 39fe7ab20c3..68c779b50db 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java @@ -26,6 +26,7 @@ import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.errors.ProducerFencedException; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.LogContext; @@ -299,4 +300,27 @@ public void shouldThrowIfTopicIsUnknownWithContinueExceptionHandler() { new AlwaysContinueProductionExceptionHandler()); collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); } + + @Test + public void shouldUnwrapAndThrowProducerFencedExceptionFromCallToSend() { + final MockProducer<byte[], byte[]> producer = + new MockProducer<>(cluster, true, new DefaultPartitioner(), byteArraySerializer, byteArraySerializer); + + final RecordCollector collector = new RecordCollectorImpl( + producer, + "test", + logContext, + new DefaultProductionExceptionHandler() + ); + + producer.initTransactions(); + producer.fenceProducer(); + + try { + collector.send("topic1", "3", "0", null, stringSerializer, stringSerializer, streamPartitioner); + fail("expected a ProducerFencedException"); + } catch (ProducerFencedException pfe) { + // expected + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Producer getting fenced may cause Streams to shut down > ------------------------------------------------------ > > Key: KAFKA-7284 > URL: https://issues.apache.org/jira/browse/KAFKA-7284 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0 > Reporter: John Roesler > Assignee: John Roesler > Priority: Critical > Fix For: 2.0.1, 2.1.0 > > > As part of the investigation, I will determine what other versions are > affected. > > In StreamTask, we catch a `ProducerFencedException` and throw a > `TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is > throwing a `StreamsException`, caused by `KafkaException` caused by > `ProducerFencedException`. > In response to a TaskMigratedException, we would rebalance, but when we get a > StreamsException, streams shuts itself down. > In other words, we intended to do a rebalance in response to a producer > fence, but actually, we shut down (when the fence happens inside the record > collector). > Coincidentally, Guozhang noticed and fixed this in a recent PR: > [https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6] > > The scope of this ticket is to extract that fix and associated tests, and > send a separate PR to trunk and 2.0, and also to determine what other > versions, if any, are affected. -- This message was sent by Atlassian JIRA (v7.6.3#76005)