[ https://issues.apache.org/jira/browse/KAFKA-6398?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320888#comment-16320888 ]
ASF GitHub Bot commented on KAFKA-6398: --------------------------------------- guozhangwang closed pull request #4384: KAFKA-6398: fix KTable.filter that does not include its parent's queryable storename URL: https://github.com/apache/kafka/pull/4384 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java index 8c79decbb6f..3bc6f4b3474 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java @@ -155,8 +155,10 @@ String internalStoreName() { builder.internalTopologyBuilder.addProcessor(name, processorSupplier, this.name); if (storeSupplier != null) { builder.internalTopologyBuilder.addStateStore(storeSupplier, name); + return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, true); + } else { + return new KTableImpl<>(builder, name, processorSupplier, sourceNodes, this.queryableStoreName, false); } - return new KTableImpl<>(builder, name, processorSupplier, this.keySerde, this.valSerde, sourceNodes, internalStoreName, internalStoreName != null); } private KTable<K, V> doFilter(final Predicate<? super K, ? super V> predicate, diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 39ea44f1bfd..39010022a0b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -456,6 +456,7 @@ public final void addSource(final Topology.AutoOffsetReset offsetReset, } for (final String predecessor : predecessorNames) { + Objects.requireNonNull(predecessor, "predecessor name can't be null"); if (predecessor.equals(name)) { throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); } @@ -483,6 +484,7 @@ public final void addProcessor(final String name, } for (final String predecessor : predecessorNames) { + Objects.requireNonNull(predecessor, "predecessor name must not be null"); if (predecessor.equals(name)) { throw new TopologyException("Processor " + name + " cannot be a predecessor of itself."); } @@ -508,6 +510,7 @@ public final void addStateStore(final org.apache.kafka.streams.processor.StateSt if (processorNames != null) { for (final String processorName : processorNames) { + Objects.requireNonNull(processorName, "processor name must not be null"); connectProcessorAndStateStore(processorName, supplier.name()); } } @@ -524,6 +527,7 @@ public final void addStateStore(final StoreBuilder storeBuilder, if (processorNames != null) { for (final String processorName : processorNames) { + Objects.requireNonNull(processorName, "processor name must not be null"); connectProcessorAndStateStore(processorName, storeBuilder.name()); } } @@ -602,11 +606,12 @@ private void validateTopicNotAlreadyRegistered(final String topic) { public final void connectProcessorAndStateStores(final String processorName, final String... stateStoreNames) { Objects.requireNonNull(processorName, "processorName can't be null"); - Objects.requireNonNull(stateStoreNames, "stateStoreNames can't be null"); + Objects.requireNonNull(stateStoreNames, "state store list must not be null"); if (stateStoreNames.length == 0) { throw new TopologyException("Must provide at least one state store name."); } for (final String stateStoreName : stateStoreNames) { + Objects.requireNonNull(stateStoreName, "state store name must not be null"); connectProcessorAndStateStore(processorName, stateStoreName); } } @@ -627,6 +632,7 @@ public final void connectProcessors(final String... processorNames) { } for (final String processorName : processorNames) { + Objects.requireNonNull(processorName, "processor name can't be null"); if (!nodeFactories.containsKey(processorName)) { throw new TopologyException("Processor " + processorName + " is not added yet."); } diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java index cee01dc0662..13b5b4583f3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java @@ -22,12 +22,16 @@ import org.apache.kafka.streams.errors.TopologyException; import org.apache.kafka.streams.kstream.ForeachAction; import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; import org.apache.kafka.streams.kstream.Materialized; import org.apache.kafka.streams.kstream.internals.KStreamImpl; import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; +import org.apache.kafka.test.MockMapper; +import org.apache.kafka.test.MockPredicate; import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; import org.junit.Rule; import org.junit.Test; @@ -59,6 +63,30 @@ public void testFrom() { builder.build().addSource(KStreamImpl.SOURCE_NAME + "0000000000", "topic-3"); } + @Test + public void shouldAllowJoinUnmaterializedFilteredKTable() { + final KTable<Bytes, String> filteredKTable = builder.<Bytes, String>table("table-topic").filter(MockPredicate.<Bytes, String>allGoodPredicate()); + builder.<Bytes, String>stream("stream-topic").join(filteredKTable, MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + } + + @Test + public void shouldAllowJoinUnmaterializedMapValuedKTable() { + final KTable<Bytes, String> mappedKTable = builder.<Bytes, String>table("table-topic").mapValues(MockMapper.<String>noOpValueMapper()); + builder.<Bytes, String>stream("stream-topic").join(mappedKTable, MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + } + + @Test + public void shouldAllowJoinMaterializedSourceKTable() { + final KTable<Bytes, String> table = builder.<Bytes, String>table("table-topic"); + builder.<Bytes, String>stream("stream-topic").join(table, MockValueJoiner.TOSTRING_JOINER); + + driver.setUp(builder, TestUtils.tempDirectory()); + } + @Test public void shouldProcessingFromSinkTopic() { final KStream<String, String> source = builder.stream("topic-source"); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java index 855bcea9b27..65a6de70157 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/InternalTopicIntegrationTest.java @@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.ValueMapper; import org.apache.kafka.streams.processor.internals.ProcessorStateManager; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.BeforeClass; @@ -139,7 +139,7 @@ public void shouldCompactTopicsForStateChangelogs() throws Exception { public Iterable<String> apply(final String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } - }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()) + }).groupBy(MockMapper.<String, String>selectValueMapper()) .count("Counts").toStream(); wordCounts.to(stringSerde, longSerde, DEFAULT_OUTPUT_TOPIC); @@ -186,7 +186,7 @@ public void shouldUseCompactAndDeleteForWindowStoreChangelogs() throws Exception public Iterable<String> apply(String value) { return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } - }).groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()) + }).groupBy(MockMapper.<String, String>selectValueMapper()) .count(TimeWindows.of(1000).until(durationMs), "CountWindows").toStream(); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java index cb588490623..4c12bb93544 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationDedupIntegrationTest.java @@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.TimeWindows; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -99,7 +99,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 10 * 1024 * 1024L); streamsConfiguration.put(IntegrationTestUtils.INTERNAL_LEAVE_GROUP_ON_CLOSE, true); - KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.<Integer, String>SelectValueMapper(); + KeyValueMapper<Integer, String, String> mapper = MockMapper.<Integer, String>selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); groupedStream = stream .groupBy( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java index 3500dd57db2..4527c19b471 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamAggregationIntegrationTest.java @@ -52,7 +52,7 @@ import org.apache.kafka.streams.state.ReadOnlySessionStore; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; import org.junit.After; import org.junit.Before; @@ -116,7 +116,7 @@ public void before() throws InterruptedException { streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Integer().getClass()); - final KeyValueMapper<Integer, String, String> mapper = MockKeyValueMapper.SelectValueMapper(); + final KeyValueMapper<Integer, String, String> mapper = MockMapper.selectValueMapper(); stream = builder.stream(streamOneInput, Consumed.with(Serdes.Integer(), Serdes.String())); groupedStream = stream .groupBy( diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java index 5f6ff449b87..32546de0ef0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/KStreamRepartitionJoinTest.java @@ -39,7 +39,7 @@ import org.apache.kafka.streams.kstream.Predicate; import org.apache.kafka.streams.kstream.ValueJoiner; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockValueJoiner; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -104,7 +104,7 @@ public void before() throws InterruptedException { streamTwo = builder.stream(streamTwoInput, Consumed.with(Serdes.Integer(), Serdes.String())); streamFour = builder.stream(streamFourInput, Consumed.with(Serdes.Integer(), Serdes.String())); - keyMapper = MockKeyValueMapper.SelectValueKeyValueMapper(); + keyMapper = MockMapper.selectValueKeyValueMapper(); } @After @@ -157,7 +157,7 @@ private ExpectedOutputOnTopic mapStreamOneAndJoin() throws InterruptedException private ExpectedOutputOnTopic mapBothStreamsAndJoin() throws InterruptedException { final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); - final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); + final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper()); doJoin(map1, map2, "map-both-streams-and-join-" + testNo); return new ExpectedOutputOnTopic(expectedStreamOneTwoJoin, "map-both-streams-and-join-" + testNo); @@ -183,7 +183,7 @@ private ExpectedOutputOnTopic mapMapJoin() throws InterruptedException { private ExpectedOutputOnTopic selectKeyAndJoin() throws Exception { final KStream<Integer, Integer> keySelected = - streamOne.selectKey(MockKeyValueMapper.<Long, Integer>SelectValueMapper()); + streamOne.selectKey(MockMapper.<Long, Integer>selectValueMapper()); final String outputTopic = "select-key-join-" + testNo; doJoin(keySelected, streamTwo, outputTopic); @@ -222,7 +222,7 @@ private ExpectedOutputOnTopic joinMappedRhsStream() throws InterruptedException private ExpectedOutputOnTopic mapBothStreamsAndLeftJoin() throws InterruptedException { final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); - final KStream<Integer, String> map2 = streamTwo.map(MockKeyValueMapper.<Integer, String>NoOpKeyValueMapper()); + final KStream<Integer, String> map2 = streamTwo.map(MockMapper.<Integer, String>noOpKeyValueMapper()); final String outputTopic = "left-join-" + testNo; @@ -247,7 +247,7 @@ private ExpectedOutputOnTopic joinTwoMappedStreamsOneThatHasBeenPreviouslyJoined final KStream<Integer, Integer> map1 = streamOne.map(keyMapper); final KeyValueMapper<Integer, String, KeyValue<Integer, String>> - kvMapper = MockKeyValueMapper.NoOpKeyValueMapper(); + kvMapper = MockMapper.noOpKeyValueMapper(); final KStream<Integer, String> map2 = streamTwo.map(kvMapper); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java index 18ffb876f50..4b983cf2da0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/PurgeRepartitionTopicIntegrationTest.java @@ -33,7 +33,7 @@ import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -163,7 +163,7 @@ public void setup() { StreamsBuilder builder = new StreamsBuilder(); builder.stream(INPUT_TOPIC) - .groupBy(MockKeyValueMapper.SelectKeyKeyValueMapper()) + .groupBy(MockMapper.selectKeyKeyValueMapper()) .count(); kafkaStreams = new KafkaStreams(builder.build(), new StreamsConfig(streamsConfiguration), time); diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java index 51c5ce237ec..fc2eaccebc9 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/QueryableStateIntegrationTest.java @@ -57,7 +57,7 @@ import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; import org.apache.kafka.test.IntegrationTest; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; import org.junit.After; @@ -218,7 +218,7 @@ private KafkaStreams createCountStream(final String inputTopic, return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")); } }) - .groupBy(MockKeyValueMapper.<String, String>SelectValueMapper()); + .groupBy(MockMapper.<String, String>selectValueMapper()); // Create a State Store for the all time word count groupedByWord diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java index 5ffedb872fd..f9949d3c5b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/KStreamBuilderTest.java @@ -27,7 +27,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockValueJoiner; @@ -300,7 +300,7 @@ public void shouldMapStateStoresToCorrectSourceTopics() { final KTable<String, String> table = builder.table("table-topic", "table-store"); assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); - final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); + final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper()); mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count"); assertEquals(Collections.singletonList("table-topic"), builder.stateStoreNameToSourceTopics().get("table-store")); assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.stateStoreNameToSourceTopics().get("count")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java index 05a02140c35..156acadbedd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/InternalStreamsBuilderTest.java @@ -31,7 +31,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.MockValueJoiner; import org.junit.After; @@ -251,7 +251,7 @@ public void shouldMapStateStoresToCorrectSourceTopics() throws Exception { final KTable<String, String> table = builder.table("table-topic", consumed, materialized); assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store")); - final KStream<String, String> mapped = playEvents.map(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); + final KStream<String, String> mapped = playEvents.map(MockMapper.<String, String>selectValueKeyValueMapper()); mapped.leftJoin(table, MockValueJoiner.TOSTRING_JOINER).groupByKey().count("count"); assertEquals(Collections.singletonList("table-topic"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("table-store")); assertEquals(Collections.singletonList(APP_ID + "-KSTREAM-MAP-0000000003-repartition"), builder.internalTopologyBuilder.stateStoreNameToSourceTopics().get("count")); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java index 705cf62f9a6..3bbd7e2fa50 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KGroupedTableImplTest.java @@ -33,7 +33,7 @@ import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -62,7 +62,7 @@ @Before public void before() { groupedTable = builder.table("blah", Consumed.with(Serdes.String(), Serdes.String())) - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper()); + .groupBy(MockMapper.<String, String>selectValueKeyValueMapper()); } @Test @@ -221,7 +221,7 @@ public void shouldReduceAndMaterializeResults() { @Test public void shouldCountAndMaterializeResults() { final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String())); - table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), + table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String())) .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("count") @@ -238,7 +238,7 @@ public void shouldCountAndMaterializeResults() { @Test public void shouldAggregateAndMaterializeResults() { final KTable<String, String> table = builder.table(topic, Consumed.with(Serdes.String(), Serdes.String())); - table.groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), + table.groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), Serialized.with(Serdes.String(), Serdes.String())) .aggregate(MockInitializer.STRING_INIT, diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java index 0a0232c16e3..562711d2d08 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java @@ -40,7 +40,7 @@ import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.SourceNode; import org.apache.kafka.test.KStreamTestDriver; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockValueJoiner; import org.junit.Before; @@ -379,7 +379,7 @@ public void shouldNotAllowNullActionOnForEach() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullTableOnJoinWithGlobalTable() { testStream.join((GlobalKTable) null, - MockKeyValueMapper.<String, String>SelectValueMapper(), + MockMapper.<String, String>selectValueMapper(), MockValueJoiner.TOSTRING_JOINER); } @@ -393,14 +393,14 @@ public void shouldNotAllowNullMapperOnJoinWithGlobalTable() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullJoinerOnJoinWithGlobalTable() { testStream.join(builder.globalTable("global", stringConsumed), - MockKeyValueMapper.<String, String>SelectValueMapper(), + MockMapper.<String, String>selectValueMapper(), null); } @Test(expected = NullPointerException.class) public void shouldNotAllowNullTableOnJLeftJoinWithGlobalTable() { testStream.leftJoin((GlobalKTable) null, - MockKeyValueMapper.<String, String>SelectValueMapper(), + MockMapper.<String, String>selectValueMapper(), MockValueJoiner.TOSTRING_JOINER); } @@ -414,7 +414,7 @@ public void shouldNotAllowNullMapperOnLeftJoinWithGlobalTable() { @Test(expected = NullPointerException.class) public void shouldNotAllowNullJoinerOnLeftJoinWithGlobalTable() { testStream.leftJoin(builder.globalTable("global", stringConsumed), - MockKeyValueMapper.<String, String>SelectValueMapper(), + MockMapper.<String, String>selectValueMapper(), null); } diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java index 0ae95dd9c5a..df8d2923a04 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableAggregateTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.TestUtils; import org.junit.Before; @@ -73,7 +73,7 @@ public void testAggBasic() { final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); KTable<String, String> table1 = builder.table(topic1, consumed); - KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), + KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(), stringSerialzied ).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -121,7 +121,7 @@ public void testAggCoalesced() { final MockProcessorSupplier<String, String> proc = new MockProcessorSupplier<>(); KTable<String, String> table1 = builder.table(topic1, consumed); - KTable<String, String> table2 = table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper(), + KTable<String, String> table2 = table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper(), stringSerialzied ).aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, @@ -235,7 +235,7 @@ public void testCount() { final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(input, consumed) - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) + .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() .process(proc); @@ -250,7 +250,7 @@ public void testCountWithInternalStore() { final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(input, consumed) - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) + .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied) .count() .toStream() .process(proc); @@ -265,7 +265,7 @@ public void testCountCoalesced() { final MockProcessorSupplier<String, Long> proc = new MockProcessorSupplier<>(); builder.table(input, consumed) - .groupBy(MockKeyValueMapper.<String, String>SelectValueKeyValueMapper(), stringSerialzied) + .groupBy(MockMapper.<String, String>selectValueKeyValueMapper(), stringSerialzied) .count("count") .toStream() .process(proc); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java index 7986277a7f7..d70d8b7d59b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableFilterTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.TestUtils; import org.junit.Before; import org.junit.Rule; @@ -451,7 +451,7 @@ public void testSkipNullOnMaterialization() { public boolean test(String key, String value) { return value.equalsIgnoreCase("accept"); } - }).groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + }).groupBy(MockMapper.<String, String>noOpKeyValueMapper()) .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result"); doTestSkipNullOnMaterialization(builder, table1, table2, topic1); @@ -473,7 +473,7 @@ public void testQueryableSkipNullOnMaterialization() { public boolean test(String key, String value) { return value.equalsIgnoreCase("accept"); } - }, "anyStoreNameFilter").groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + }, "anyStoreNameFilter").groupBy(MockMapper.<String, String>noOpKeyValueMapper()) .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result"); doTestSkipNullOnMaterialization(builder, table1, table2, topic1); diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java index 9d918e27b7c..9539b45fe74 100644 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KTableImplTest.java @@ -35,7 +35,7 @@ import org.apache.kafka.test.KStreamTestDriver; import org.apache.kafka.test.MockAggregator; import org.apache.kafka.test.MockInitializer; -import org.apache.kafka.test.MockKeyValueMapper; +import org.apache.kafka.test.MockMapper; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockReducer; import org.apache.kafka.test.MockValueJoiner; @@ -341,11 +341,11 @@ public void testRepartition() throws NoSuchFieldException, IllegalAccessExceptio .withValueSerde(stringSerde) ); - table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper()) .aggregate(MockInitializer.STRING_INIT, MockAggregator.TOSTRING_ADDER, MockAggregator.TOSTRING_REMOVER, "mock-result1"); - table1.groupBy(MockKeyValueMapper.<String, String>NoOpKeyValueMapper()) + table1.groupBy(MockMapper.<String, String>noOpKeyValueMapper()) .reduce(MockReducer.STRING_ADDER, MockReducer.STRING_REMOVER, "mock-result2"); driver.setUp(builder, stateDir, stringSerde, stringSerde); diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java index e68f86bc15c..a39e545f513 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -448,6 +448,11 @@ public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores builder.connectProcessorAndStateStores(null, "store"); } + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullStateStoreNameWhenConnectingProcessorAndStateStores() throws Exception { + builder.connectProcessorAndStateStores("processor", new String[]{null}); + } + @Test(expected = NullPointerException.class) public void shouldNotAddNullInternalTopic() throws Exception { builder.addInternalTopic(null); diff --git a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/MockMapper.java similarity index 76% rename from streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java rename to streams/src/test/java/org/apache/kafka/test/MockMapper.java index 2ad24d7592e..fec9522de82 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockKeyValueMapper.java +++ b/streams/src/test/java/org/apache/kafka/test/MockMapper.java @@ -18,10 +18,9 @@ import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; -public class MockKeyValueMapper { - - +public class MockMapper { private static class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> { @Override @@ -51,20 +50,31 @@ public K apply(K key, V value) { } } - public static <K, V> KeyValueMapper<K, V, K> SelectKeyKeyValueMapper() { + private static class NoOpValueMapper<V> implements ValueMapper<V, V> { + @Override + public V apply(final V value) { + return value; + } + } + + public static <K, V> KeyValueMapper<K, V, K> selectKeyKeyValueMapper() { return new SelectKeyMapper<>(); } - public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> NoOpKeyValueMapper() { + public static <K, V> KeyValueMapper<K, V, KeyValue<K, V>> noOpKeyValueMapper() { return new NoOpKeyValueMapper<>(); } - public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> SelectValueKeyValueMapper() { + public static <K, V> KeyValueMapper<K, V, KeyValue<V, V>> selectValueKeyValueMapper() { return new SelectValueKeyValueMapper<>(); } - public static <K, V> KeyValueMapper<K, V, V> SelectValueMapper() { + public static <K, V> KeyValueMapper<K, V, V> selectValueMapper() { return new SelectValueMapper<>(); } + + public static <V> ValueMapper<V, V> noOpValueMapper() { + return new NoOpValueMapper<>(); + } } \ No newline at end of file diff --git a/streams/src/test/java/org/apache/kafka/test/MockPredicate.java b/streams/src/test/java/org/apache/kafka/test/MockPredicate.java new file mode 100644 index 00000000000..9d59bab3d9b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/test/MockPredicate.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.test; + +import org.apache.kafka.streams.kstream.Predicate; + +public class MockPredicate { + + private static class AllGoodPredicate<K, V> implements Predicate<K, V> { + @Override + public boolean test(final K key, final V value) { + return true; + } + } + + public static <K, V> Predicate<K, V> allGoodPredicate() { + return new AllGoodPredicate<>(); + } +} ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Non-aggregation KTable generation operator does not construct value getter > correctly > ------------------------------------------------------------------------------------ > > Key: KAFKA-6398 > URL: https://issues.apache.org/jira/browse/KAFKA-6398 > Project: Kafka > Issue Type: Bug > Components: streams > Affects Versions: 0.11.0.1, 1.0.0 > Reporter: Matthias J. Sax > Assignee: Guozhang Wang > Priority: Critical > Labels: bug > > For any operator that generates a KTable, its {{valueGetterSupplier}} has > three code path: > 1. If the operator is a KTable source operator, using its materialized state > store for value getter (note that currently we always materialize on KTable > source). > 2. If the operator is an aggregation operator, then its generated KTable > should always be materialized so we just use its materialized state store. > 3. Otherwise, we treat the value getter in a per-operator basis. > For 3) above, what we SHOULD do is that, if the generated KTable is > materialized, the value getter would just rely on its materialized state > store to get the value; otherwise we just rely on the operator itself to > define which parent's value getter to inherit and what computational logic to > apply on-the-fly to get the value. For example, for {{KTable#filter()}} where > the {{Materialized}} is not specified, in {{KTableFilterValueGetter}} we just > get from parent's value getter and then apply the filter on the fly; and in > addition we should let the future operators to be able to access its parent's > materialized state store via {{connectProcessorAndStateStore}}. > However, current code does not do this correctly: it 1) does not check if the > result KTable is materialized or not, but always try to use its parent's > value getter, and 2) it does not try to connect its parent's materialized > store to the future operator. As a result, these operators such as > {{KTable#filter}}, {{KTable#mapValues}}, and {{KTable#join(KTable)}} would > result in TopologyException when building. The following is an example: > ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------ > Using a non-materialized KTable in a stream-table join fails: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(...); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > fails with > {noformat} > org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology > building: StateStore null is not added yet. > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStore(TopologyBuilder.java:1021) > at > org.apache.kafka.streams.processor.TopologyBuilder.connectProcessorAndStateStores(TopologyBuilder.java:949) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.doStreamTableJoin(KStreamImpl.java:621) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:577) > at > org.apache.kafka.streams.kstream.internals.KStreamImpl.join(KStreamImpl.java:563) > {noformat} > Adding a store name is not sufficient as workaround but fails differently: > {noformat} > final KTable filteredKTable = builder.table("table-topic").filter(..., > "STORE-NAME"); > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > error: > {noformat} > org.apache.kafka.streams.errors.StreamsException: failed to initialize > processor KSTREAM-JOIN-0000000005 > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:113) > at > org.apache.kafka.streams.processor.internals.StreamTask.initTopology(StreamTask.java:339) > at > org.apache.kafka.streams.processor.internals.StreamTask.initialize(StreamTask.java:153) > Caused by: org.apache.kafka.streams.errors.TopologyBuilderException: Invalid > topology building: Processor KSTREAM-JOIN-0000000005 has no access to > StateStore KTABLE-SOURCE-STATE-STORE-0000000000 > at > org.apache.kafka.streams.processor.internals.ProcessorContextImpl.getStateStore(ProcessorContextImpl.java:69) > at > org.apache.kafka.streams.kstream.internals.KTableSourceValueGetterSupplier$KTableSourceValueGetter.init(KTableSourceValueGetterSupplier.java:45) > at > org.apache.kafka.streams.kstream.internals.KTableFilter$KTableFilterValueGetter.init(KTableFilter.java:121) > at > org.apache.kafka.streams.kstream.internals.KStreamKTableJoinProcessor.init(KStreamKTableJoinProcessor.java:44) > at > org.apache.kafka.streams.processor.internals.ProcessorNode$2.run(ProcessorNode.java:53) > at > org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:201) > at > org.apache.kafka.streams.processor.internals.ProcessorNode.init(ProcessorNode.java:111) > {noformat} > One can workaround by piping the result through a topic: > {noformat} > final KTable filteredKTable = > builder.table("table-topic").filter(...).through("TOPIC");; > builder.stream("stream-topic").join(filteredKTable,...); > {noformat} > ------------------------------------------------------------------------------------------------------------ > Note that there is another minor orthogonal issue of {{KTable#filter}} itself > that it does not include its parent's queryable store name when itself is not > materialized (see {{KTable#mapValues}} for reference). -- This message was sent by Atlassian JIRA (v6.4.14#64029)