[ 
https://issues.apache.org/jira/browse/KAFKA-7284?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16587752#comment-16587752
 ] 

ASF GitHub Bot commented on KAFKA-7284:
---------------------------------------

mjsax closed pull request #5520: KAFKA-7284: streams should unwrap fenced 
exception
URL: https://github.com/apache/kafka/pull/5520
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index d2a84c66abe..a72714bb3df 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -21,6 +21,7 @@
 import org.apache.kafka.clients.producer.internals.FutureRecordMetadata;
 import org.apache.kafka.clients.producer.internals.ProduceRequestResult;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Metric;
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.PartitionInfo;
@@ -205,7 +206,7 @@ public void abortTransaction() throws 
ProducerFencedException {
         this.transactionInFlight = false;
     }
 
-    private void verifyProducerState() {
+    private synchronized void verifyProducerState() {
         if (this.closed) {
             throw new IllegalStateException("MockProducer is already closed.");
         }
@@ -243,7 +244,12 @@ private void verifyNoTransactionInFlight() {
      */
     @Override
     public synchronized Future<RecordMetadata> send(ProducerRecord<K, V> 
record, Callback callback) {
-        verifyProducerState();
+        if (this.closed) {
+            throw new IllegalStateException("MockProducer is already closed.");
+        }
+        if (this.producerFenced) {
+            throw new KafkaException("MockProducer is fenced.", new 
ProducerFencedException("Fenced"));
+        }
         int partition = 0;
         if (!this.cluster.partitionsForTopic(record.topic()).isEmpty())
             partition = partition(record, this.cluster);
@@ -313,7 +319,7 @@ public boolean closed() {
         return this.closed;
     }
 
-    public void fenceProducer() {
+    public synchronized void fenceProducer() {
         verifyProducerState();
         verifyTransactionsInitialized();
         this.producerFenced = true;
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
index 27fac280afc..9acde112a5f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/producer/MockProducerTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.clients.producer.internals.DefaultPartitioner;
 import org.apache.kafka.common.Cluster;
+import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -267,7 +268,9 @@ public void shouldThrowOnSendIfProducerGotFenced() {
         try {
             producer.send(null);
             fail("Should have thrown as producer is fenced off");
-        } catch (ProducerFencedException e) { }
+        } catch (KafkaException e) {
+            assertTrue("The root cause of the exception should be 
ProducerFenced", e.getCause() instanceof ProducerFencedException);
+        }
     }
 
     @Test
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
index a3aea1c9ef0..a775044ac0a 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java
@@ -129,6 +129,16 @@ public void onCompletion(final RecordMetadata metadata, 
final Exception exceptio
                 }
                 log.warn("Timeout exception caught when sending record to 
topic {}; retrying with {} attempt", topic, attempt);
                 Utils.sleep(SEND_RETRY_BACKOFF);
+            } catch (final Exception uncaughtException) {
+                if (uncaughtException instanceof KafkaException &&
+                    uncaughtException.getCause() instanceof 
ProducerFencedException) {
+                    final KafkaException kafkaException = (KafkaException) 
uncaughtException;
+                    // producer.send() call may throw a KafkaException which 
wraps a FencedException,
+                    // in this case we should throw its wrapped inner cause so 
that it can be captured and re-wrapped as TaskMigrationException
+                    throw (ProducerFencedException) kafkaException.getCause();
+                } else {
+                    throw uncaughtException;
+                }
             }
         }
     }
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
index 7b2a41e323e..2ec0db14035 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordCollectorTest.java
@@ -25,6 +25,7 @@
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.ProducerFencedException;
 import org.apache.kafka.common.errors.TimeoutException;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -41,6 +42,7 @@
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
 
 public class RecordCollectorTest {
 
@@ -146,6 +148,23 @@ public void shouldRetryWhenTimeoutExceptionOccursOnSend() {
         assertEquals(Long.valueOf(0L), offset);
     }
 
+    @Test
+    public void shouldUnwrapAndThrowProducerFencedExceptionFromCallToSend() {
+        final MockProducer<byte[], byte[]> producer =
+            new MockProducer<>(cluster, true, new DefaultPartitioner(), 
byteArraySerializer, byteArraySerializer);
+        final RecordCollector collector = new RecordCollectorImpl(producer, 
"test", logContext);
+
+        producer.initTransactions();
+        producer.fenceProducer();
+
+        try {
+            collector.send("topic1", "3", "0", null, stringSerializer, 
stringSerializer, streamPartitioner);
+            fail("expected a ProducerFencedException");
+        } catch (ProducerFencedException pfe) {
+            // expected
+        }
+    }
+
     @SuppressWarnings("unchecked")
     @Test(expected = StreamsException.class)
     public void shouldThrowStreamsExceptionAfterMaxAttempts() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Producer getting fenced may cause Streams to shut down
> ------------------------------------------------------
>
>                 Key: KAFKA-7284
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7284
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 0.11.0.3, 1.0.2, 1.1.1, 2.0.0
>            Reporter: John Roesler
>            Assignee: John Roesler
>            Priority: Critical
>             Fix For: 1.1.2, 2.0.1, 2.1.0
>
>
> As part of the investigation, I will determine what other versions are 
> affected.
>  
> In StreamTask, we catch a `ProducerFencedException` and throw a 
> `TaskMigratedException`. However, in this case, the `RecordCollectorImpl` is 
> throwing a `StreamsException`, caused by `KafkaException` caused by 
> `ProducerFencedException`.
> In response to a TaskMigratedException, we would rebalance, but when we get a 
> StreamsException, streams shuts itself down.
> In other words, we intended to do a rebalance in response to a producer 
> fence, but actually, we shut down (when the fence happens inside the record 
> collector).
> Coincidentally, Guozhang noticed and fixed this in a recent PR: 
> [https://github.com/apache/kafka/pull/5428/files#diff-4e5612eeba09dabf30d0b8430f269ff6]
>  
> The scope of this ticket is to extract that fix and associated tests, and 
> send a separate PR to trunk and 2.0, and also to determine what other 
> versions, if any, are affected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to