This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch 2.6
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.6 by this push:
new 2bc3532 KAFKA-10395: relax output topic check in TTD to work with
dynamic routing (#9174)
2bc3532 is described below
commit 2bc3532e8d57ff6833ef9e4d60fd2e7293342b27
Author: A. Sophie Blee-Goldman <[email protected]>
AuthorDate: Tue Aug 18 07:39:40 2020 -0700
KAFKA-10395: relax output topic check in TTD to work with dynamic routing
(#9174)
Reviewers: Boyang Chen <[email protected]>, Bruno Cadonna
<[email protected]>, John Roesler <[email protected]>
---
.../java/org/apache/kafka/streams/TopologyTestDriver.java | 8 ++++----
.../java/org/apache/kafka/streams/TestTopicsTest.java | 2 +-
.../org/apache/kafka/streams/TopologyTestDriverTest.java | 15 +++++++++++++++
3 files changed, 20 insertions(+), 5 deletions(-)
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index 225fd74..dda11f0 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -803,10 +803,10 @@ public class TopologyTestDriver implements Closeable {
private Queue<ProducerRecord<byte[], byte[]>> getRecordsQueue(final String
topicName) {
final Queue<ProducerRecord<byte[], byte[]>> outputRecords =
outputRecordsByTopic.get(topicName);
- if (outputRecords == null) {
- if (!processorTopology.sinkTopics().contains(topicName)) {
- throw new IllegalArgumentException("Unknown topic: " +
topicName);
- }
+ if (outputRecords == null &&
!processorTopology.sinkTopics().contains(topicName)) {
+ log.warn("Unrecognized topic: {}, this can occur if dynamic
routing is used and no output has been "
+ + "sent to this topic yet. If not using a
TopicNameExtractor, check that the output topic "
+ + "is correct.", topicName);
}
return outputRecords;
}
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
index 0e22ec9..3f1b8a4 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TestTopicsTest.java
@@ -334,7 +334,7 @@ public class TestTopicsTest {
public void testNonExistingOutputTopic() {
final TestOutputTopic<Long, String> outputTopic =
testDriver.createOutputTopic("no-exist", longSerde.deserializer(),
stringSerde.deserializer());
- assertThrows("Unknown topic", IllegalArgumentException.class,
outputTopic::readRecord);
+ assertThrows("Uninitialized topic", NoSuchElementException.class,
outputTopic::readRecord);
}
@Test
diff --git
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
index 9b7b554..b206ba1 100644
---
a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
+++
b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.streams;
+import java.util.NoSuchElementException;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.header.Header;
@@ -475,6 +476,20 @@ public class TopologyTestDriverTest {
}
@Test
+ public void
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting() {
+ testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);
+ final TestOutputTopic<String, String> outputTopic = new
TestOutputTopic<>(
+ testDriver,
+ "unused-topic",
+ new StringDeserializer(),
+ new StringDeserializer()
+ );
+
+ assertTrue(outputTopic.isEmpty());
+ assertThrows(NoSuchElementException.class, outputTopic::readRecord);
+ }
+
+ @Test
public void shouldCaptureSinkTopicNamesIfWrittenInto() {
testDriver = new TopologyTestDriver(setupSourceSinkTopology(), config);