[ 
https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511683#comment-16511683
 ] 

ASF GitHub Bot commented on KAFKA-6749:
---------------------------------------

mjsax closed pull request #4912: KAFKA-6749: Fixed TopologyTestDriver to 
process stream processing guarantee as exactly once
URL: https://github.com/apache/kafka/pull/4912
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 7f752652da4..723780110ff 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -195,6 +195,7 @@
     private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = 
new HashMap<>();
 
     private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> 
outputRecordsByTopic = new HashMap<>();
+    private final boolean eosEnabled;
 
     /**
      * Create a new test diver instance.
@@ -345,6 +346,7 @@ public void onRestoreEnd(final TopicPartition 
topicPartition, final String store
             task = null;
             context = null;
         }
+        eosEnabled = 
streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE);
     }
 
     /**
@@ -439,6 +441,10 @@ private void captureOutputRecords() {
         // Capture all the records sent to the producer ...
         final List<ProducerRecord<byte[], byte[]>> output = producer.history();
         producer.clear();
+        if (eosEnabled && !producer.closed()) {
+            producer.initTransactions();
+            producer.beginTransaction();
+        }
         for (final ProducerRecord<byte[], byte[]> record : output) {
             outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new 
LinkedList<>()).add(record);
 
@@ -666,6 +672,9 @@ public void close() {
             }
         }
         captureOutputRecords();
+        if (!eosEnabled) {
+            producer.close();
+        }
         stateDirectory.clean();
     }
 
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 7552637dc26..135fb3ffd8a 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -50,6 +50,8 @@
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
 import java.util.Collection;
@@ -61,6 +63,7 @@
 import java.util.Objects;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Arrays;
 import java.util.regex.Pattern;
 
 import static org.hamcrest.CoreMatchers.equalTo;
@@ -70,6 +73,7 @@
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+@RunWith(value = Parameterized.class)
 public class TopologyTestDriverTest {
     private final static String SOURCE_TOPIC_1 = "source-topic-1";
     private final static String SOURCE_TOPIC_2 = "source-topic-2";
@@ -108,6 +112,23 @@
         new StringSerializer(),
         new LongSerializer());
 
+    private final boolean eosEnabled;
+
+    @Parameterized.Parameters(name = "Eos enabled = {0}")
+    public static Collection<Object[]> data() {
+        final List<Object[]> values = new ArrayList<>();
+        for (final boolean eosEnabled : Arrays.asList(true, false)) {
+            values.add(new Object[] {eosEnabled});
+        }
+        return values;
+    }
+
+    public TopologyTestDriverTest(final boolean eosEnabled) {
+        this.eosEnabled = eosEnabled;
+        if (eosEnabled) {
+            config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
StreamsConfig.EXACTLY_ONCE);
+        }
+    }
 
     private final static class Record {
         private final Object key;
@@ -353,6 +374,8 @@ public void shouldCloseProcessor() {
 
         testDriver.close();
         assertTrue(mockProcessors.get(0).closed);
+        // As testDriver is already closed, bypassing @After tearDown 
testDriver.close().
+        testDriver = null;
     }
 
     @Test


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE
> ------------------------------------------------------------------
>
>                 Key: KAFKA-6749
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6749
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Frederic Arno
>            Assignee: Jagadesh Adireddi
>            Priority: Minor
>              Labels: newbie
>
> Stream processing topologies which are configured to use {{EXACTLY_ONCE}} 
> processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests 
> usually crash with {{java.lang.IllegalStateException: MockProducer hasn't 
> been initialized for transactions}} within the second call to 
> {{TopologyTestDriver.pipeInput()}}, the first call works fine.
> Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass.
> This is a problem because it is expected that proper processor topologies can 
> be successfully tested using {{TopologyTestDriver}}, however 
> {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during 
> tests. To a developer, this usually means that there is something wrong with 
> their processor topologies.
> Kafka developpers can reproduce this by adding:
> {code:java}
> put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, 
> StreamsConfig.EXACTLY_ONCE);{code}
> to line 88 of TopologyTestDriverTest: 
> streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
> Originally [reported on the 
> ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E].



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to