[ https://issues.apache.org/jira/browse/KAFKA-5253?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16445704#comment-16445704 ]
ASF GitHub Bot commented on KAFKA-5253: --------------------------------------- mjsax closed pull request #4793: KAFKA-5253: Fixed KStreamTestDriver to handle streams created with patterns URL: https://github.com/apache/kafka/pull/4793 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 535f03524c0..b1d60a9ad88 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -1880,4 +1880,8 @@ public void updateSubscribedTopics(final Set<String> topics, final String logPre subscriptionUpdates.updateTopics(topics); updateSubscriptions(subscriptionUpdates, logPrefix); } + + public synchronized Set<String> getSourceTopicNames() { + return sourceTopicNames; + } } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 2009806fd18..ef65bb32b36 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -53,6 +53,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.core.IsInstanceOf.instanceOf; @@ -542,4 +543,49 @@ public void shouldMergeMultipleStreams() { assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee", "F:ff", "G:gg", "H:hh"), processorSupplier.processed); } + + @Test + public void shouldProcessFromSourceThatMatchPattern() { + final KStream<String, String> pattern2Source = builder.stream(Pattern.compile("topic-\\d")); + + final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + pattern2Source.process(processorSupplier); + + driver.setUp(builder); + driver.setTime(0L); + + driver.process("topic-3", "A", "aa"); + driver.process("topic-4", "B", "bb"); + driver.process("topic-5", "C", "cc"); + driver.process("topic-6", "D", "dd"); + driver.process("topic-7", "E", "ee"); + + assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"), + processorSupplier.processed); + } + + @Test + public void shouldProcessFromSourcesThatMatchMultiplePattern() { + final String topic3 = "topic-without-pattern"; + + final KStream<String, String> pattern2Source1 = builder.stream(Pattern.compile("topic-\\d")); + final KStream<String, String> pattern2Source2 = builder.stream(Pattern.compile("topic-[A-Z]")); + final KStream<String, String> source3 = builder.stream(topic3); + final KStream<String, String> merged = pattern2Source1.merge(pattern2Source2).merge(source3); + + final MockProcessorSupplier<String, String> processorSupplier = new MockProcessorSupplier<>(); + merged.process(processorSupplier); + + driver.setUp(builder); + driver.setTime(0L); + + driver.process("topic-3", "A", "aa"); + driver.process("topic-4", "B", "bb"); + driver.process("topic-A", "C", "cc"); + driver.process("topic-Z", "D", "dd"); + driver.process(topic3, "E", "ee"); + + assertEquals(Utils.mkList("A:aa", "B:bb", "C:cc", "D:dd", "E:ee"), + processorSupplier.processed); + } } diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index aebb84975e2..eb137dbabeb 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -42,6 +42,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.regex.Pattern; public class KStreamTestDriver extends ExternalResource { @@ -184,9 +185,15 @@ public void process(final String topicName, final Object key, final Object value private ProcessorNode sourceNodeByTopicName(final String topicName) { ProcessorNode topicNode = topology.source(topicName); - - if (topicNode == null && globalTopology != null) { - topicNode = globalTopology.source(topicName); + if (topicNode == null) { + for (final String sourceTopic : topology.sourceTopics()) { + if (Pattern.compile(sourceTopic).matcher(topicName).matches()) { + return topology.source(sourceTopic); + } + } + if (globalTopology != null) { + topicNode = globalTopology.source(topicName); + } } return topicNode; diff --git a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java index 7730beddd0e..c03bf1a2e56 100644 --- a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java +++ b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java @@ -34,6 +34,7 @@ import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.Time; import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler; +import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.PunctuationType; import org.apache.kafka.streams.processor.Punctuator; @@ -75,6 +76,7 @@ import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import java.util.regex.Pattern; /** * This class makes it easier to write tests to verify the behavior of topologies created with {@link Topology} or @@ -328,7 +330,10 @@ public void onRestoreEnd(final TopicPartition topicPartition, final String store public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) { final String topicName = consumerRecord.topic(); - final TopicPartition topicPartition = partitionsByTopic.get(topicName); + if (!internalTopologyBuilder.getSourceTopicNames().isEmpty()) { + validateSourceTopicNameRegexPattern(consumerRecord.topic()); + } + final TopicPartition topicPartition = getTopicPartition(topicName); if (topicPartition != null) { final long offset = offsetsByTopicPartition.get(topicPartition).incrementAndGet() - 1; task.addRecords(topicPartition, Collections.singleton(new ConsumerRecord<>( @@ -371,6 +376,28 @@ public void pipeInput(final ConsumerRecord<byte[], byte[]> consumerRecord) { } } + private void validateSourceTopicNameRegexPattern(final String inputRecordTopic) { + for (final String sourceTopicName : internalTopologyBuilder.getSourceTopicNames()) { + if (!sourceTopicName.equals(inputRecordTopic) && Pattern.compile(sourceTopicName).matcher(inputRecordTopic).matches()) { + throw new TopologyException("Topology add source of type String for topic: " + sourceTopicName + + " cannot contain regex pattern for input record topic: " + inputRecordTopic + + " and hence cannot process the message."); + } + } + } + + private TopicPartition getTopicPartition(final String topicName) { + final TopicPartition topicPartition = partitionsByTopic.get(topicName); + if (topicPartition == null) { + for (final Map.Entry<String, TopicPartition> entry : partitionsByTopic.entrySet()) { + if (Pattern.compile(entry.getKey()).matcher(topicName).matches()) { + return entry.getValue(); + } + } + } + return topicPartition; + } + private void captureOutputRecords() { // Capture all the records sent to the producer ... final List<ProducerRecord<byte[], byte[]>> output = producer.history(); diff --git a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java index d757f3384b9..077b8ca3992 100644 --- a/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java +++ b/streams/test-utils/src/test/java/org/apache/kafka/streams/TopologyTestDriverTest.java @@ -28,6 +28,7 @@ import org.apache.kafka.common.utils.Bytes; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.streams.kstream.Materialized; +import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; @@ -54,6 +55,7 @@ import java.util.Objects; import java.util.Properties; import java.util.Set; +import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; @@ -902,11 +904,11 @@ public void close() {} ); } } - + @Test public void shouldFeedStoreFromGlobalKTable() { final StreamsBuilder builder = new StreamsBuilder(); - builder.globalTable("topic", + builder.globalTable("topic", Consumed.with(Serdes.String(), Serdes.String()), Materialized.<String, String, KeyValueStore<Bytes, byte[]>>as("globalStore")); try (final TopologyTestDriver testDriver = new TopologyTestDriver(builder.build(), config)) { @@ -919,4 +921,98 @@ public void shouldFeedStoreFromGlobalKTable() { Assert.assertEquals("value1", globalStore.get("k1")); } } + + private Topology setupMultipleSourcesPatternTopology(final Pattern... sourceTopicPatternNames) { + final Topology topology = new Topology(); + + final String[] processorNames = new String[sourceTopicPatternNames.length]; + int i = 0; + for (final Pattern sourceTopicPatternName : sourceTopicPatternNames) { + final String sourceName = sourceTopicPatternName + "-source"; + final String processorName = sourceTopicPatternName + "-processor"; + topology.addSource(sourceName, sourceTopicPatternName); + processorNames[i++] = processorName; + topology.addProcessor(processorName, new MockProcessorSupplier(), sourceName); + } + topology.addSink("sink-topic", SINK_TOPIC_1, processorNames); + return topology; + } + + @Test + public void shouldProcessFromSourcesThatMatchMultiplePattern() { + + final Pattern pattern2Source1 = Pattern.compile("source-topic-\\d"); + final Pattern pattern2Source2 = Pattern.compile("source-topic-[A-Z]"); + final String consumerTopic2 = "source-topic-Z"; + + final ConsumerRecord<byte[], byte[]> consumerRecord2 = consumerRecordFactory.create(consumerTopic2, key2, value2, timestamp2); + + testDriver = new TopologyTestDriver(setupMultipleSourcesPatternTopology(pattern2Source1, pattern2Source2), config); + + final List<Record> processedRecords1 = mockProcessors.get(0).processedRecords; + final List<Record> processedRecords2 = mockProcessors.get(1).processedRecords; + + testDriver.pipeInput(consumerRecord1); + + assertEquals(1, processedRecords1.size()); + assertEquals(0, processedRecords2.size()); + + final Record record1 = processedRecords1.get(0); + final Record expectedResult1 = new Record(consumerRecord1); + expectedResult1.offset = 0L; + assertThat(record1, equalTo(expectedResult1)); + + testDriver.pipeInput(consumerRecord2); + + assertEquals(1, processedRecords1.size()); + assertEquals(1, processedRecords2.size()); + + final Record record2 = processedRecords2.get(0); + final Record expectedResult2 = new Record(consumerRecord2); + expectedResult2.offset = 0L; + assertThat(record2, equalTo(expectedResult2)); + } + + @Test + public void shouldProcessFromSourceThatMatchPattern() { + final String sourceName = "source"; + final Pattern pattern2Source1 = Pattern.compile("source-topic-\\d"); + + final Topology topology = new Topology(); + + topology.addSource(sourceName, pattern2Source1); + topology.addSink("sink", SINK_TOPIC_1, sourceName); + + testDriver = new TopologyTestDriver(topology, config); + testDriver.pipeInput(consumerRecord1); + + final ProducerRecord outputRecord = testDriver.readOutput(SINK_TOPIC_1); + assertEquals(key1, outputRecord.key()); + assertEquals(value1, outputRecord.value()); + assertEquals(SINK_TOPIC_1, outputRecord.topic()); + } + + @Test + public void shouldThrowPatternNotValidForTopicNameException() { + final String sourceName = "source"; + final String pattern2Source1 = "source-topic-\\d"; + + final Topology topology = new Topology(); + + topology.addSource(sourceName, pattern2Source1); + topology.addSink("sink", SINK_TOPIC_1, sourceName); + + testDriver = new TopologyTestDriver(topology, config); + try { + testDriver.pipeInput(consumerRecord1); + } catch (final TopologyException exception) { + String str = + String.format( + "Invalid topology: Topology add source of type String for topic: %s cannot contain regex pattern for " + + "input record topic: %s and hence cannot process the message.", + pattern2Source1, + SOURCE_TOPIC_1); + assertEquals(str, exception.getMessage()); + } + } } ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > TopologyTestDriver must handle streams created with patterns > ------------------------------------------------------------ > > Key: KAFKA-5253 > URL: https://issues.apache.org/jira/browse/KAFKA-5253 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests > Affects Versions: 1.1.0 > Reporter: Wim Van Leuven > Assignee: Jagadesh Adireddi > Priority: Major > Labels: beginner, needs-kip, newbie > > *Context* > -KStreamTestDriver-Â TopologyTestDriver (added via KIP-247) is being used to > unit test topologies while developing KStreams apps. > One such topology uses a Pattern to consume from multiple topics at once. > *Problem* > The unit test of the topology fails because -KStreamTestDriver-Â > TopologyTestDriver fails to deal with Patterns properly. > *Example* > Underneath is a unit test explaining what I understand should happen, but is > failing. > **Note: the example below is outdate as it used the old KStreamTestDriver. > The overall test layout can be adopted for TopologyTestDriver though, thus, > we just leave it in the description.** > Explicitly adding a source topic matching the topic pattern, generates an > exception as the topology builder explicitly checks overlapping topic names > and patterns, in any order of adding pattern and topic. So, it is intended > behaviour. > {code:java} > @Test > public void shouldProcessFromSourcesThatDoMatchThePattern() { > // -- setup stream pattern > final KStream<String, String> source = > builder.stream(Pattern.compile("topic-source-\\d")); > source.to("topic-sink"); > // -- setup processor to capture results > final MockProcessorSupplier<String, String> processorSupplier = new > MockProcessorSupplier<>(); > source.process(processorSupplier); > // -- add source to stream data from > //builder.addSource(builder.newName(KStreamImpl.SOURCE_NAME), > "topic-source-3"); > // -- build test driver > driver = new KStreamTestDriver(builder); // this should be > TopologyTestDriver > driver.setTime(0L); > // -- test > driver.process("topic-source-3", "A", "aa"); > // -- validate > // no exception was thrown > assertEquals(Utils.mkList("A:aa"), processorSupplier.processed); > } > {code} > *Solution* > If anybody can help in defining the solution, I can create a pull request > for this change.- -- This message was sent by Atlassian JIRA (v7.6.3#76005)