[ https://issues.apache.org/jira/browse/KAFKA-6749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16511683#comment-16511683 ]
ASF GitHub Bot commented on KAFKA-6749: --------------------------------------- mjsax closed pull request #4912: KAFKA-6749: Fixed TopologyTestDriver to process stream processing guarantee as exactly once URL: https://github.com/apache/kafka/pull/4912 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 7f752652da4..723780110ff 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -195,6 +195,7 @@ private final Map<TopicPartition, AtomicLong> offsetsByTopicPartition = new HashMap<>(); private final Map<String, Queue<ProducerRecord<byte[], byte[]>>> outputRecordsByTopic = new HashMap<>(); + private final boolean eosEnabled; /** * Create a new test diver instance. @@ -345,6 +346,7 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store task = null; context = null; } + eosEnabled = streamsConfig.getString(StreamsConfig.PROCESSING_GUARANTEE_CONFIG).equals(StreamsConfig.EXACTLY_ONCE); } /** @@ -439,6 +441,10 @@ private void captureOutputRecords() { // Capture all the records sent to the producer ... final List<ProducerRecord<byte[], byte[]>> output = producer.history(); producer.clear(); + if (eosEnabled && !producer.closed()) { + producer.initTransactions(); + producer.beginTransaction(); + } for (final ProducerRecord<byte[], byte[]> record : output) { outputRecordsByTopic.computeIfAbsent(record.topic(), k -> new LinkedList<>()).add(record); @@ -666,6 +672,9 @@ public void close() { } } captureOutputRecords(); + if (!eosEnabled) { + producer.close(); + } stateDirectory.clean(); } diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index 7552637dc26..135fb3ffd8a 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -50,6 +50,8 @@ import org.junit.After; import org.junit.Assert; import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; import java.util.ArrayList; import java.util.Collection; @@ -61,6 +63,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.Arrays; import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; @@ -70,6 +73,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +@RunWith(value = Parameterized.class) public class TopologyTestDriverTest { private final static String SOURCE_TOPIC_1 = "source-topic-1"; private final static String SOURCE_TOPIC_2 = "source-topic-2"; @@ -108,6 +112,23 @@ new StringSerializer(), new LongSerializer()); + private final boolean eosEnabled; + + @Parameterized.Parameters(name = "Eos enabled = {0}") + public static Collection<Object[]> data() { + final List<Object[]> values = new ArrayList<>(); + for (final boolean eosEnabled : Arrays.asList(true, false)) { + values.add(new Object[] {eosEnabled}); + } + return values; + } + + public TopologyTestDriverTest(final boolean eosEnabled) { + this.eosEnabled = eosEnabled; + if (eosEnabled) { + config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE); + } + } private final static class Record { private final Object key; @@ -353,6 +374,8 @@ public void shouldCloseProcessor() { testDriver.close(); assertTrue(mockProcessors.get(0).closed); + // As testDriver is already closed, bypassing @After tearDown testDriver.close(). + testDriver = null; } @Test ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver fails when topoloy under test uses EXACTLY_ONCE > ------------------------------------------------------------------ > > Key: KAFKA-6749 > URL: https://issues.apache.org/jira/browse/KAFKA-6749 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 1.1.0 > Reporter: Frederic Arno > Assignee: Jagadesh Adireddi > Priority: Minor > Labels: newbie > > Stream processing topologies which are configured to use {{EXACTLY_ONCE}} > processing guarantee cannot be tested with the {{TopologyTestDriver}}. Tests > usually crash with {{java.lang.IllegalStateException: MockProducer hasn't > been initialized for transactions}} within the second call to > {{TopologyTestDriver.pipeInput()}}, the first call works fine. > Changing the processing guarantee to {{AT_LEAST_ONCE}} makes tests pass. > This is a problem because it is expected that proper processor topologies can > be successfully tested using {{TopologyTestDriver}}, however > {{TopologyTestDriver}} can't handle {{EXACTLY_ONCE}} and crashes during > tests. To a developer, this usually means that there is something wrong with > their processor topologies. > Kafka developpers can reproduce this by adding: > {code:java} > put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, > StreamsConfig.EXACTLY_ONCE);{code} > to line 88 of TopologyTestDriverTest: > streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java > Originally [reported on the > ML|http://mail-archives.apache.org/mod_mbox/kafka-users/201804.mbox/%3C54ab29ad-44e1-35bd-9c16-c1d8d68a88db%40gmail.com%3E]. -- This message was sent by Atlassian JIRA (v7.6.3#76005)