This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.6 by this push:
     new 2bc3532  KAFKA-10395: relax output topic check in TTD to work with 
dynamic routing (#9174)
2bc3532 is described below

commit 2bc3532e8d57ff6833ef9e4d60fd2e7293342b27
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Aug 18 07:39:40 2020 -0700

    KAFKA-10395: relax output topic check in TTD to work with dynamic routing 
(#9174)
    
    Reviewers: Boyang Chen <[email protected]>, Bruno Cadonna 
<[email protected]>, John Roesler <[email protected]>
---
 .../java/org/apache/kafka/streams/TopologyTestDriver.java |  8 ++++----
 .../java/org/apache/kafka/streams/TestTopicsTest.java     |  2 +-
 .../org/apache/kafka/streams/TopologyTestDriverTest.java  | 15 +++++++++++++++
 3 files changed, 20 insertions(+), 5 deletions(-)

diff --git 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 225fd74..dda11f0 100644
--- 
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++ 
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -803,10 +803,10 @@ public class TopologyTestDriver implements Closeable {
 
     private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String 
topicName) {
         final Queue<ProducerRecord<byte[], byte[]>> outputRecords = 
outputRecordsByTopic.get(topicName);
-        if (outputRecords == null) {
-            if (!processorTopology.sinkTopics().contains(topicName)) {
-                throw new IllegalArgumentException("Unknown topic: " + 
topicName);
-            }
+        if (outputRecords == null && 
!processorTopology.sinkTopics().contains(topicName)) {
+            log.warn("Unrecognized topic: {}, this can occur if dynamic 
routing is used and no output has been "
+                         + "sent to this topic yet. If not using a 
TopicNameExtractor, check that the output topic "
+                         + "is correct.", topicName);
         }
         return outputRecords;
     }
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index 0e22ec9..3f1b8a4 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -334,7 +334,7 @@ public class TestTopicsTest {
     public void testNonExistingOutputTopic() {
         final TestOutputTopic<Long, String> outputTopic =
             testDriver.createOutputTopic("no-exist", longSerde.deserializer(), 
stringSerde.deserializer());
-        assertThrows("Unknown topic", IllegalArgumentException.class, 
outputTopic::readRecord);
+        assertThrows("Uninitialized topic", NoSuchElementException.class, 
outputTopic::readRecord);
     }
 
     @Test
diff --git 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 9b7b554..b206ba1 100644
--- 
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++ 
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.streams;
 
+import java.util.NoSuchElementException;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.header.Header;
@@ -475,6 +476,20 @@ public class TopologyTestDriverTest {
     }
 
     @Test
+    public void 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting() {
+        testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+        final TestOutputTopic<String, String> outputTopic = new 
TestOutputTopic<>(
+            testDriver,
+            "unused-topic",
+            new StringDeserializer(),
+            new StringDeserializer()
+        );
+
+        assertTrue(outputTopic.isEmpty());
+        assertThrows(NoSuchElementException.class, outputTopic::readRecord);
+    }
+
+    @Test
     public void shouldCaptureSinkTopicNamesIfWrittenInto() {
         testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
 

Reply via email to