http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java index e526da4..467f8b8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/KafkaStreamsTest.java @@ -81,7 +81,7 @@ public class KafkaStreamsTest { final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); - StateListenerStub stateListener = new StateListenerStub(); + final StateListenerStub stateListener = new StateListenerStub(); streams.setStateListener(stateListener); Assert.assertEquals(streams.state(), KafkaStreams.State.CREATED); Assert.assertEquals(stateListener.numChanges, 0); @@ -102,7 +102,7 @@ public class KafkaStreamsTest { final KStreamBuilder builder = new KStreamBuilder(); final KafkaStreams streams = new KafkaStreams(builder, props); - StateListenerStub stateListener = new StateListenerStub(); + final StateListenerStub stateListener = new StateListenerStub(); streams.setStateListener(stateListener); streams.close(); Assert.assertEquals(streams.state(), KafkaStreams.State.NOT_RUNNING); @@ -161,7 +161,7 @@ public class KafkaStreamsTest { final java.lang.reflect.Field globalThreadField = streams.getClass().getDeclaredField("globalStreamThread"); globalThreadField.setAccessible(true); - GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams); + final GlobalStreamThread globalStreamThread = (GlobalStreamThread) globalThreadField.get(streams); assertEquals(globalStreamThread, null); } @@ -269,8 +269,7 @@ public class KafkaStreamsTest { props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, "illegalConfig"); final KStreamBuilder builder = new KStreamBuilder(); - final KafkaStreams streams = new KafkaStreams(builder, props); - + new KafkaStreams(builder, props); } @Test @@ -285,8 +284,7 @@ public class KafkaStreamsTest { props.setProperty(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, Sensor.RecordingLevel.DEBUG.toString()); final KStreamBuilder builder2 = new KStreamBuilder(); - final KafkaStreams streams2 = new KafkaStreams(builder2, props); - + new KafkaStreams(builder2, props); } @Test(expected = IllegalStateException.class) @@ -337,7 +335,7 @@ public class KafkaStreamsTest { while (keepRunning.get()) { Thread.sleep(10); } - } catch (InterruptedException e) { + } catch (final InterruptedException e) { // no-op } } @@ -415,29 +413,28 @@ public class KafkaStreamsTest { @Test public void testToString() { streams.start(); - String streamString = streams.toString(); + final String streamString = streams.toString(); streams.close(); - String appId = streamString.split("\\n")[1].split(":")[1].trim(); + final String appId = streamString.split("\\n")[1].split(":")[1].trim(); Assert.assertNotEquals("streamString should not be empty", "", streamString); Assert.assertNotNull("streamString should not be null", streamString); Assert.assertNotEquals("streamString contains non-empty appId", "", appId); Assert.assertNotNull("streamString contains non-null appId", appId); } - public static class StateListenerStub implements KafkaStreams.StateListener { - public int numChanges = 0; - public KafkaStreams.State oldState; - public KafkaStreams.State newState; + int numChanges = 0; + KafkaStreams.State oldState; + KafkaStreams.State newState; public Map<KafkaStreams.State, Long> mapStates = new HashMap<>(); @Override public void onChange(final KafkaStreams.State newState, final KafkaStreams.State oldState) { - long prevCount = this.mapStates.containsKey(newState) ? this.mapStates.get(newState) : 0; - this.numChanges++; + final long prevCount = mapStates.containsKey(newState) ? mapStates.get(newState) : 0; + numChanges++; this.oldState = oldState; this.newState = newState; - this.mapStates.put(newState, prevCount + 1); + mapStates.put(newState, prevCount + 1); } } }
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java index 0b5c5e9..ad70112 100644 --- a/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java @@ -36,12 +36,13 @@ import org.apache.kafka.streams.processor.ProcessorSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.StreamTask; import org.apache.kafka.streams.processor.internals.StreamThread; import org.apache.kafka.streams.processor.internals.StreamsMetadataState; +import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; -import org.apache.kafka.test.IntegrationTest; import org.apache.kafka.test.StreamsTestUtils; import org.apache.kafka.test.TestCondition; import org.apache.kafka.test.TestUtils; @@ -152,9 +153,15 @@ public class RegexSourceIntegrationTest { final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); final StreamThread originalThread = streamThreads[0]; - final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, + final TestStreamThread testStreamThread = new TestStreamThread( + builder.internalTopologyBuilder, + streamsConfig, new DefaultKafkaClientSupplier(), - originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM); + originalThread.applicationId, + originalThread.clientId, + originalThread.processId, + new Metrics(), + Time.SYSTEM); final TestCondition oneTopicAdded = new TestCondition() { @Override @@ -206,9 +213,15 @@ public class RegexSourceIntegrationTest { final StreamThread[] streamThreads = (StreamThread[]) streamThreadsField.get(streams); final StreamThread originalThread = streamThreads[0]; - final TestStreamThread testStreamThread = new TestStreamThread(builder, streamsConfig, + final TestStreamThread testStreamThread = new TestStreamThread( + builder.internalTopologyBuilder, + streamsConfig, new DefaultKafkaClientSupplier(), - originalThread.applicationId, originalThread.clientId, originalThread.processId, new Metrics(), Time.SYSTEM); + originalThread.applicationId, + originalThread.clientId, + originalThread.processId, + new Metrics(), + Time.SYSTEM); streamThreads[0] = testStreamThread; @@ -347,9 +360,15 @@ public class RegexSourceIntegrationTest { final StreamThread[] leaderStreamThreads = (StreamThread[]) leaderStreamThreadsField.get(partitionedStreamsLeader); final StreamThread originalLeaderThread = leaderStreamThreads[0]; - final TestStreamThread leaderTestStreamThread = new TestStreamThread(builderLeader, streamsConfig, - new DefaultKafkaClientSupplier(), - originalLeaderThread.applicationId, originalLeaderThread.clientId, originalLeaderThread.processId, new Metrics(), Time.SYSTEM); + final TestStreamThread leaderTestStreamThread = new TestStreamThread( + builderLeader.internalTopologyBuilder, + streamsConfig, + new DefaultKafkaClientSupplier(), + originalLeaderThread.applicationId, + originalLeaderThread.clientId, + originalLeaderThread.processId, + new Metrics(), + Time.SYSTEM); leaderStreamThreads[0] = leaderTestStreamThread; @@ -367,9 +386,15 @@ public class RegexSourceIntegrationTest { final StreamThread[] followerStreamThreads = (StreamThread[]) followerStreamThreadsField.get(partitionedStreamsFollower); final StreamThread originalFollowerThread = followerStreamThreads[0]; - final TestStreamThread followerTestStreamThread = new TestStreamThread(builderFollower, streamsConfig, - new DefaultKafkaClientSupplier(), - originalFollowerThread.applicationId, originalFollowerThread.clientId, originalFollowerThread.processId, new Metrics(), Time.SYSTEM); + final TestStreamThread followerTestStreamThread = new TestStreamThread( + builderFollower.internalTopologyBuilder, + streamsConfig, + new DefaultKafkaClientSupplier(), + originalFollowerThread.applicationId, + originalFollowerThread.clientId, + originalFollowerThread.processId, + new Metrics(), + Time.SYSTEM); followerStreamThreads[0] = followerTestStreamThread; @@ -438,7 +463,7 @@ public class RegexSourceIntegrationTest { private class TestStreamThread extends StreamThread { public volatile List<String> assignedTopicPartitions = new ArrayList<>(); - public TestStreamThread(final TopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) { + public TestStreamThread(final InternalTopologyBuilder builder, final StreamsConfig config, final KafkaClientSupplier clientSupplier, final String applicationId, final String clientId, final UUID processId, final Metrics metrics, final Time time) { super(builder, config, clientSupplier, applicationId, clientId, processId, metrics, time, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java index bad193a..a7ddb7b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyBuilderTest.java @@ -50,12 +50,12 @@ import java.util.regex.Pattern; import static org.apache.kafka.common.utils.Utils.mkList; import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.core.IsInstanceOf.instanceOf; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.fail; import static org.junit.Assert.assertThat; -import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; public class TopologyBuilderTest { @@ -611,7 +611,7 @@ public class TopologyBuilderTest { @Test(expected = TopologyBuilderException.class) - public void shouldThroughOnUnassignedStateStoreAccess() { + public void shouldThroughOnUnassignedStateStoreAccess() throws Exception { final String sourceNodeName = "source"; final String goodNodeName = "goodGuy"; final String badNodeName = "badGuy"; @@ -631,7 +631,7 @@ public class TopologyBuilderTest { goodNodeName) .addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); - final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder); + final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder.internalTopologyBuilder); driver.process("topic", null, null); } catch (final StreamsException e) { final Throwable cause = e.getCause(); http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java index 17c5640..a541eb3 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TopologyTest.java @@ -16,6 +16,8 @@ */ package org.apache.kafka.streams.processor; +import org.apache.kafka.streams.TopologyDescription; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateStoreSupplier; import org.junit.Test; @@ -29,11 +31,11 @@ import java.util.regex.Pattern; import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.MatcherAssert.assertThat; -// TODO (remove this comment) Test name ok, we just use TopologyBuilder for now in this test until Topology gets added +// TODO (remove this comment) Test name ok, we just use InternalTopologyBuilder for now in this test until Topology gets added public class TopologyTest { - // TODO change from TopologyBuilder to Topology - private final TopologyBuilder topology = new TopologyBuilder(); - private final TopologyDescription expectedDescription = new TopologyDescription(); + // TODO change from InternalTopologyBuilder to Topology + private final InternalTopologyBuilder topology = new InternalTopologyBuilder(); + private final InternalTopologyBuilder.TopologyDescription expectedDescription = new InternalTopologyBuilder.TopologyDescription(); @Test public void shouldDescribeEmptyTopology() { @@ -45,7 +47,7 @@ public class TopologyTest { final TopologyDescription.Source expectedSourceNode = addSource("source", "topic"); expectedDescription.addSubtopology( - new TopologyDescription.Subtopology(0, + new InternalTopologyBuilder.Subtopology(0, Collections.<TopologyDescription.Node>singleton(expectedSourceNode))); assertThat(topology.describe(), equalTo(expectedDescription)); @@ -56,7 +58,7 @@ public class TopologyTest { final TopologyDescription.Source expectedSourceNode = addSource("source", "topic1", "topic2", "topic3"); expectedDescription.addSubtopology( - new TopologyDescription.Subtopology(0, + new InternalTopologyBuilder.Subtopology(0, Collections.<TopologyDescription.Node>singleton(expectedSourceNode))); assertThat(topology.describe(), equalTo(expectedDescription)); @@ -67,7 +69,7 @@ public class TopologyTest { final TopologyDescription.Source expectedSourceNode = addSource("source", Pattern.compile("topic[0-9]")); expectedDescription.addSubtopology( - new TopologyDescription.Subtopology(0, + new InternalTopologyBuilder.Subtopology(0, Collections.<TopologyDescription.Node>singleton(expectedSourceNode))); assertThat(topology.describe(), equalTo(expectedDescription)); @@ -77,17 +79,17 @@ public class TopologyTest { public void multipleSourcesShouldHaveDistinctSubtopologies() { final TopologyDescription.Source expectedSourceNode1 = addSource("source1", "topic1"); expectedDescription.addSubtopology( - new TopologyDescription.Subtopology(0, + new InternalTopologyBuilder.Subtopology(0, Collections.<TopologyDescription.Node>singleton(expectedSourceNode1))); final TopologyDescription.Source expectedSourceNode2 = addSource("source2", "topic2"); expectedDescription.addSubtopology( - new TopologyDescription.Subtopology(1, + new InternalTopologyBuilder.Subtopology(1, Collections.<TopologyDescription.Node>singleton(expectedSourceNode2))); final TopologyDescription.Source expectedSourceNode3 = addSource("source3", "topic3"); expectedDescription.addSubtopology( - new TopologyDescription.Subtopology(2, + new InternalTopologyBuilder.Subtopology(2, Collections.<TopologyDescription.Node>singleton(expectedSourceNode3))); assertThat(topology.describe(), equalTo(expectedDescription)); @@ -101,7 +103,7 @@ public class TopologyTest { final Set<TopologyDescription.Node> allNodes = new HashSet<>(); allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -116,7 +118,7 @@ public class TopologyTest { final Set<TopologyDescription.Node> allNodes = new HashSet<>(); allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -132,7 +134,7 @@ public class TopologyTest { final Set<TopologyDescription.Node> allNodes = new HashSet<>(); allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -147,7 +149,7 @@ public class TopologyTest { allNodes.add(expectedSourceNode); allNodes.add(expectedProcessorNode1); allNodes.add(expectedProcessorNode2); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -162,7 +164,7 @@ public class TopologyTest { allNodes.add(expectedSourceNode1); allNodes.add(expectedSourceNode2); allNodes.add(expectedProcessorNode); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -181,17 +183,17 @@ public class TopologyTest { final Set<TopologyDescription.Node> allNodes1 = new HashSet<>(); allNodes1.add(expectedSourceNode1); allNodes1.add(expectedProcessorNode1); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1)); final Set<TopologyDescription.Node> allNodes2 = new HashSet<>(); allNodes2.add(expectedSourceNode2); allNodes2.add(expectedProcessorNode2); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2)); final Set<TopologyDescription.Node> allNodes3 = new HashSet<>(); allNodes3.add(expectedSourceNode3); allNodes3.add(expectedProcessorNode3); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -210,17 +212,17 @@ public class TopologyTest { final Set<TopologyDescription.Node> allNodes1 = new HashSet<>(); allNodes1.add(expectedSourceNode1); allNodes1.add(expectedSinkNode1); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes1)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes1)); final Set<TopologyDescription.Node> allNodes2 = new HashSet<>(); allNodes2.add(expectedSourceNode2); allNodes2.add(expectedSinkNode2); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(1, allNodes2)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(1, allNodes2)); final Set<TopologyDescription.Node> allNodes3 = new HashSet<>(); allNodes3.add(expectedSourceNode3); allNodes3.add(expectedSinkNode3); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(2, allNodes3)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(2, allNodes3)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -251,7 +253,7 @@ public class TopologyTest { allNodes.add(expectedSourceNode3); allNodes.add(expectedProcessorNode3); allNodes.add(expectedSinkNode); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -281,7 +283,7 @@ public class TopologyTest { allNodes.add(expectedProcessorNode2); allNodes.add(expectedSourceNode3); allNodes.add(expectedProcessorNode3); - expectedDescription.addSubtopology(new TopologyDescription.Subtopology(0, allNodes)); + expectedDescription.addSubtopology(new InternalTopologyBuilder.Subtopology(0, allNodes)); assertThat(topology.describe(), equalTo(expectedDescription)); } @@ -301,41 +303,41 @@ public class TopologyTest { private TopologyDescription.Source addSource(final String sourceName, final String... sourceTopic) { - topology.addSource(sourceName, sourceTopic); + topology.addSource(null, sourceName, null, null, null, sourceTopic); String allSourceTopics = sourceTopic[0]; for (int i = 1; i < sourceTopic.length; ++i) { allSourceTopics += ", " + sourceTopic[i]; } - return new TopologyDescription.Source(sourceName, allSourceTopics); + return new InternalTopologyBuilder.Source(sourceName, allSourceTopics); } private TopologyDescription.Source addSource(final String sourceName, final Pattern sourcePattern) { - topology.addSource(sourceName, sourcePattern); - return new TopologyDescription.Source(sourceName, sourcePattern.toString()); + topology.addSource(null, sourceName, null, null, null, sourcePattern); + return new InternalTopologyBuilder.Source(sourceName, sourcePattern.toString()); } private TopologyDescription.Processor addProcessor(final String processorName, - final TopologyDescription.AbstractNode... parents) { + final TopologyDescription.Node... parents) { return addProcessorWithNewStore(processorName, new String[0], parents); } private TopologyDescription.Processor addProcessorWithNewStore(final String processorName, final String[] storeNames, - final TopologyDescription.AbstractNode... parents) { + final TopologyDescription.Node... parents) { return addProcessorWithStore(processorName, storeNames, true, parents); } private TopologyDescription.Processor addProcessorWithExistingStore(final String processorName, final String[] storeNames, - final TopologyDescription.AbstractNode... parents) { + final TopologyDescription.Node... parents) { return addProcessorWithStore(processorName, storeNames, false, parents); } private TopologyDescription.Processor addProcessorWithStore(final String processorName, final String[] storeNames, final boolean newStores, - final TopologyDescription.AbstractNode... parents) { + final TopologyDescription.Node... parents) { final String[] parentNames = new String[parents.length]; for (int i = 0; i < parents.length; ++i) { parentNames[i] = parents[i].name(); @@ -350,11 +352,11 @@ public class TopologyTest { topology.connectProcessorAndStateStores(processorName, storeNames); } final TopologyDescription.Processor expectedProcessorNode - = new TopologyDescription.Processor(processorName, new HashSet<>(Arrays.asList(storeNames))); + = new InternalTopologyBuilder.Processor(processorName, new HashSet<>(Arrays.asList(storeNames))); - for (final TopologyDescription.AbstractNode parent : parents) { - parent.addSuccessor(expectedProcessorNode); - expectedProcessorNode.addPredecessor(parent); + for (final TopologyDescription.Node parent : parents) { + ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedProcessorNode); + ((InternalTopologyBuilder.AbstractNode) expectedProcessorNode).addPredecessor(parent); } return expectedProcessorNode; @@ -362,19 +364,19 @@ public class TopologyTest { private TopologyDescription.Sink addSink(final String sinkName, final String sinkTopic, - final TopologyDescription.AbstractNode... parents) { + final TopologyDescription.Node... parents) { final String[] parentNames = new String[parents.length]; for (int i = 0; i < parents.length; ++i) { parentNames[i] = parents[i].name(); } - topology.addSink(sinkName, sinkTopic, parentNames); + topology.addSink(sinkName, sinkTopic, null, null, null, parentNames); final TopologyDescription.Sink expectedSinkNode - = new TopologyDescription.Sink(sinkName, sinkTopic); + = new InternalTopologyBuilder.Sink(sinkName, sinkTopic); - for (final TopologyDescription.AbstractNode parent : parents) { - parent.addSuccessor(expectedSinkNode); - expectedSinkNode.addPredecessor(parent); + for (final TopologyDescription.Node parent : parents) { + ((InternalTopologyBuilder.AbstractNode) parent).addSuccessor(expectedSinkNode); + ((InternalTopologyBuilder.AbstractNode) expectedSinkNode).addPredecessor(parent); } return expectedSinkNode; @@ -389,11 +391,12 @@ public class TopologyTest { sourceName, null, null, + null, globalTopicName, processorName, new MockProcessorSupplier()); - final TopologyDescription.GlobalStore expectedGlobalStore = new TopologyDescription.GlobalStore( + final TopologyDescription.GlobalStore expectedGlobalStore = new InternalTopologyBuilder.GlobalStore( sourceName, processorName, globalStoreName, http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java new file mode 100644 index 0000000..b98b756 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilderTest.java @@ -0,0 +1,709 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.common.serialization.Serdes; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.errors.TopologyBuilderException; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.StateStoreSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.SubscriptionUpdates; +import org.apache.kafka.streams.state.Stores; +import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockStateStoreSupplier; +import org.apache.kafka.test.MockTimestampExtractor; +import org.apache.kafka.test.ProcessorTopologyTestDriver; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.regex.Pattern; + +import static org.apache.kafka.common.utils.Utils.mkList; +import static org.apache.kafka.common.utils.Utils.mkSet; +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class InternalTopologyBuilderTest { + + private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); + private final Serde<String> stringSerde = Serdes.String(); + + @Test + public void shouldAddSourceWithOffsetReset() { + final String earliestTopic = "earliestTopic"; + final String latestTopic = "latestTopic"; + + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, earliestTopic); + builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, latestTopic); + + assertTrue(builder.earliestResetTopicsPattern().matcher(earliestTopic).matches()); + assertTrue(builder.latestResetTopicsPattern().matcher(latestTopic).matches()); + } + + @Test + public void shouldAddSourcePatternWithOffsetReset() { + final String earliestTopicPattern = "earliest.*Topic"; + final String latestTopicPattern = "latest.*Topic"; + + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, null, null, Pattern.compile(earliestTopicPattern)); + builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source2", null, null, null, Pattern.compile(latestTopicPattern)); + + assertTrue(builder.earliestResetTopicsPattern().matcher("earliestTestTopic").matches()); + assertTrue(builder.latestResetTopicsPattern().matcher("latestTestTopic").matches()); + } + + @Test + public void shouldAddSourceWithoutOffsetReset() { + final Pattern expectedPattern = Pattern.compile("test-topic"); + + builder.addSource(null, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "test-topic"); + + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + assertEquals(builder.earliestResetTopicsPattern().pattern(), ""); + assertEquals(builder.latestResetTopicsPattern().pattern(), ""); + } + + @Test + public void shouldAddPatternSourceWithoutOffsetReset() { + final Pattern expectedPattern = Pattern.compile("test-.*"); + + builder.addSource(null, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), Pattern.compile("test-.*")); + + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + assertEquals(builder.earliestResetTopicsPattern().pattern(), ""); + assertEquals(builder.latestResetTopicsPattern().pattern(), ""); + } + + @Test(expected = TopologyBuilderException.class) + public void shouldNotAllowOffsetResetSourceWithoutTopics() { + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer()); + } + + @Test + public void shouldNotAllowOffsetResetSourceWithDuplicateSourceName() { + builder.addSource(TopologyBuilder.AutoOffsetReset.EARLIEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-1"); + try { + builder.addSource(TopologyBuilder.AutoOffsetReset.LATEST, "source", null, stringSerde.deserializer(), stringSerde.deserializer(), "topic-2"); + fail("Should throw TopologyBuilderException for duplicate source name"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testAddSourceWithSameName() { + builder.addSource(null, "source", null, null, null, "topic-1"); + try { + builder.addSource(null, "source", null, null, null, "topic-2"); + fail("Should throw TopologyBuilderException with source name conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testAddSourceWithSameTopic() { + builder.addSource(null, "source", null, null, null, "topic-1"); + try { + builder.addSource(null, "source-2", null, null, null, "topic-1"); + fail("Should throw TopologyBuilderException with topic conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testAddProcessorWithSameName() { + builder.addSource(null, "source", null, null, null, "topic-1"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + try { + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + fail("Should throw TopologyBuilderException with processor name conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test(expected = TopologyBuilderException.class) + public void testAddProcessorWithWrongParent() { + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + } + + @Test(expected = TopologyBuilderException.class) + public void testAddProcessorWithSelfParent() { + builder.addProcessor("processor", new MockProcessorSupplier(), "processor"); + } + + @Test + public void testAddSinkWithSameName() { + builder.addSource(null, "source", null, null, null, "topic-1"); + builder.addSink("sink", "topic-2", null, null, null, "source"); + try { + builder.addSink("sink", "topic-3", null, null, null, "source"); + fail("Should throw TopologyBuilderException with sink name conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test(expected = TopologyBuilderException.class) + public void testAddSinkWithWrongParent() { + builder.addSink("sink", "topic-2", null, null, null, "source"); + } + + @Test(expected = TopologyBuilderException.class) + public void testAddSinkWithSelfParent() { + builder.addSink("sink", "topic-2", null, null, null, "sink"); + } + + @Test + public void testAddSinkConnectedWithParent() { + builder.addSource(null, "source", null, null, null, "source-topic"); + builder.addSink("sink", "dest-topic", null, null, null, "source"); + + final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups(); + final Set<String> nodeGroup = nodeGroups.get(0); + + assertTrue(nodeGroup.contains("sink")); + assertTrue(nodeGroup.contains("source")); + } + + @Test + public void testAddSinkConnectedWithMultipleParent() { + builder.addSource(null, "source", null, null, null, "source-topic"); + builder.addSource(null, "sourceII", null, null, null, "source-topicII"); + builder.addSink("sink", "dest-topic", null, null, null, "source", "sourceII"); + + final Map<Integer, Set<String>> nodeGroups = builder.nodeGroups(); + final Set<String> nodeGroup = nodeGroups.get(0); + + assertTrue(nodeGroup.contains("sink")); + assertTrue(nodeGroup.contains("source")); + assertTrue(nodeGroup.contains("sourceII")); + } + + @Test + public void testSourceTopics() { + builder.setApplicationId("X"); + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addSource(null, "source-3", null, null, null, "topic-3"); + builder.addInternalTopic("topic-3"); + + final Pattern expectedPattern = Pattern.compile("X-topic-3|topic-1|topic-2"); + + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test + public void testPatternSourceTopic() { + final Pattern expectedPattern = Pattern.compile("topic-\\d"); + builder.addSource(null, "source-1", null, null, null, expectedPattern); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test + public void testAddMoreThanOnePatternSourceNode() { + final Pattern expectedPattern = Pattern.compile("topics[A-Z]|.*-\\d"); + builder.addSource(null, "source-1", null, null, null, Pattern.compile("topics[A-Z]")); + builder.addSource(null, "source-2", null, null, null, Pattern.compile(".*-\\d")); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test + public void testSubscribeTopicNameAndPattern() { + final Pattern expectedPattern = Pattern.compile("topic-bar|topic-foo|.*-\\d"); + builder.addSource(null, "source-1", null, null, null, "topic-foo", "topic-bar"); + builder.addSource(null, "source-2", null, null, null, Pattern.compile(".*-\\d")); + assertEquals(expectedPattern.pattern(), builder.sourceTopicPattern().pattern()); + } + + @Test + public void testPatternMatchesAlreadyProvidedTopicSource() { + builder.addSource(null, "source-1", null, null, null, "foo"); + try { + builder.addSource(null, "source-2", null, null, null, Pattern.compile("f.*")); + fail("Should throw TopologyBuilderException with topic name/pattern conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testNamedTopicMatchesAlreadyProvidedPattern() { + builder.addSource(null, "source-1", null, null, null, Pattern.compile("f.*")); + try { + builder.addSource(null, "source-2", null, null, null, "foo"); + fail("Should throw TopologyBuilderException with topic name/pattern conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test(expected = TopologyBuilderException.class) + public void testAddStateStoreWithNonExistingProcessor() { + builder.addStateStore(new MockStateStoreSupplier("store", false), "no-such-processsor"); + } + + @Test + public void testAddStateStoreWithSource() { + builder.addSource(null, "source-1", null, null, null, "topic-1"); + try { + builder.addStateStore(new MockStateStoreSupplier("store", false), "source-1"); + fail("Should throw TopologyBuilderException with store cannot be added to source"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testAddStateStoreWithSink() { + builder.addSink("sink-1", "topic-1", null, null, null); + try { + builder.addStateStore(new MockStateStoreSupplier("store", false), "sink-1"); + fail("Should throw TopologyBuilderException with store cannot be added to sink"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testAddStateStoreWithDuplicates() { + builder.addStateStore(new MockStateStoreSupplier("store", false)); + try { + builder.addStateStore(new MockStateStoreSupplier("store", false)); + fail("Should throw TopologyBuilderException with store name conflict"); + } catch (final TopologyBuilderException expected) { /* ok */ } + } + + @Test + public void testAddStateStore() { + final StateStoreSupplier supplier = new MockStateStoreSupplier("store-1", false); + builder.addStateStore(supplier); + builder.setApplicationId("X"); + builder.addSource(null, "source-1", null, null, null, "topic-1"); + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + + assertEquals(0, builder.build(null).stateStores().size()); + + builder.connectProcessorAndStateStores("processor-1", "store-1"); + + final List<StateStore> suppliers = builder.build(null).stateStores(); + assertEquals(1, suppliers.size()); + assertEquals(supplier.name(), suppliers.get(0).name()); + } + + @Test + public void testTopicGroups() { + builder.setApplicationId("X"); + builder.addInternalTopic("topic-1x"); + builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x"); + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addSource(null, "source-3", null, null, null, "topic-3"); + builder.addSource(null, "source-4", null, null, null, "topic-4"); + builder.addSource(null, "source-5", null, null, null, "topic-5"); + + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1"); + builder.copartitionSources(mkList("source-1", "source-2")); + + builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); + + final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); + + final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>(); + expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-1", "X-topic-1x", "topic-2"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); + expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); + expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo(Collections.<String>emptySet(), mkSet("topic-5"), Collections.<String, InternalTopicConfig>emptyMap(), Collections.<String, InternalTopicConfig>emptyMap())); + + assertEquals(3, topicGroups.size()); + assertEquals(expectedTopicGroups, topicGroups); + + final Collection<Set<String>> copartitionGroups = builder.copartitionGroups(); + + assertEquals(mkSet(mkSet("topic-1", "X-topic-1x", "topic-2")), new HashSet<>(copartitionGroups)); + } + + @Test + public void testTopicGroupsByStateStore() { + builder.setApplicationId("X"); + builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x"); + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addSource(null, "source-3", null, null, null, "topic-3"); + builder.addSource(null, "source-4", null, null, null, "topic-4"); + builder.addSource(null, "source-5", null, null, null, "topic-5"); + + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2"); + builder.addStateStore(new MockStateStoreSupplier("store-1", false), "processor-1", "processor-2"); + + builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3"); + builder.addProcessor("processor-4", new MockProcessorSupplier(), "source-4"); + builder.addStateStore(new MockStateStoreSupplier("store-2", false), "processor-3", "processor-4"); + + builder.addProcessor("processor-5", new MockProcessorSupplier(), "source-5"); + final StateStoreSupplier supplier = new MockStateStoreSupplier("store-3", false); + builder.addStateStore(supplier); + builder.connectProcessorAndStateStores("processor-5", "store-3"); + + final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); + + final Map<Integer, TopologyBuilder.TopicsInfo> expectedTopicGroups = new HashMap<>(); + final String store1 = ProcessorStateManager.storeChangelogTopic("X", "store-1"); + final String store2 = ProcessorStateManager.storeChangelogTopic("X", "store-2"); + final String store3 = ProcessorStateManager.storeChangelogTopic("X", "store-3"); + expectedTopicGroups.put(0, new TopologyBuilder.TopicsInfo( + Collections.<String>emptySet(), mkSet("topic-1", "topic-1x", "topic-2"), + Collections.<String, InternalTopicConfig>emptyMap(), + Collections.singletonMap( + store1, + new InternalTopicConfig( + store1, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap())))); + expectedTopicGroups.put(1, new TopologyBuilder.TopicsInfo( + Collections.<String>emptySet(), mkSet("topic-3", "topic-4"), + Collections.<String, InternalTopicConfig>emptyMap(), + Collections.singletonMap( + store2, + new InternalTopicConfig( + store2, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap())))); + expectedTopicGroups.put(2, new TopologyBuilder.TopicsInfo( + Collections.<String>emptySet(), mkSet("topic-5"), + Collections.<String, InternalTopicConfig>emptyMap(), + Collections.singletonMap(store3, + new InternalTopicConfig( + store3, + Collections.singleton(InternalTopicConfig.CleanupPolicy.compact), + Collections.<String, String>emptyMap())))); + + assertEquals(3, topicGroups.size()); + assertEquals(expectedTopicGroups, topicGroups); + } + + @Test + public void testBuild() { + builder.addSource(null, "source-1", null, null, null, "topic-1", "topic-1x"); + builder.addSource(null, "source-2", null, null, null, "topic-2"); + builder.addSource(null, "source-3", null, null, null, "topic-3"); + builder.addSource(null, "source-4", null, null, null, "topic-4"); + builder.addSource(null, "source-5", null, null, null, "topic-5"); + + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source-1"); + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source-2", "processor-1"); + builder.addProcessor("processor-3", new MockProcessorSupplier(), "source-3", "source-4"); + + builder.setApplicationId("X"); + final ProcessorTopology topology0 = builder.build(0); + final ProcessorTopology topology1 = builder.build(1); + final ProcessorTopology topology2 = builder.build(2); + + assertEquals(mkSet("source-1", "source-2", "processor-1", "processor-2"), nodeNames(topology0.processors())); + assertEquals(mkSet("source-3", "source-4", "processor-3"), nodeNames(topology1.processors())); + assertEquals(mkSet("source-5"), nodeNames(topology2.processors())); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullNameWhenAddingSink() throws Exception { + builder.addSink(null, "topic", null, null, null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullTopicWhenAddingSink() throws Exception { + builder.addSink("name", null, null, null, null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullNameWhenAddingProcessor() throws Exception { + builder.addProcessor(null, new ProcessorSupplier() { + @Override + public Processor get() { + return null; + } + }); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullProcessorSupplier() throws Exception { + builder.addProcessor("name", null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullNameWhenAddingSource() throws Exception { + builder.addSource(null, null, null, null, null, Pattern.compile(".*")); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAllowNullProcessorNameWhenConnectingProcessorAndStateStores() throws Exception { + builder.connectProcessorAndStateStores(null, "store"); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAddNullInternalTopic() throws Exception { + builder.addInternalTopic(null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotSetApplicationIdToNull() throws Exception { + builder.setApplicationId(null); + } + + @Test(expected = NullPointerException.class) + public void shouldNotAddNullStateStoreSupplier() throws Exception { + builder.addStateStore(null); + } + + private Set<String> nodeNames(final Collection<ProcessorNode> nodes) { + final Set<String> nodeNames = new HashSet<>(); + for (final ProcessorNode node : nodes) { + nodeNames.add(node.name()); + } + return nodeNames; + } + + @Test + public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsInternal() throws Exception { + builder.addSource(null, "source", null, null, null, "topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); + final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); + assertEquals(1, stateStoreNameToSourceTopic.size()); + assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store")); + } + + @Test + public void shouldAssociateStateStoreNameWhenStateStoreSupplierIsExternal() throws Exception { + builder.addSource(null, "source", null, null, null, "topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); + final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); + assertEquals(1, stateStoreNameToSourceTopic.size()); + assertEquals(Collections.singletonList("topic"), stateStoreNameToSourceTopic.get("store")); + } + + @Test + public void shouldCorrectlyMapStateStoreToInternalTopics() throws Exception { + builder.setApplicationId("appId"); + builder.addInternalTopic("internal-topic"); + builder.addSource(null, "source", null, null, null, "internal-topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new MockStateStoreSupplier("store", false), "processor"); + final Map<String, List<String>> stateStoreNameToSourceTopic = builder.stateStoreNameToSourceTopics(); + assertEquals(1, stateStoreNameToSourceTopic.size()); + assertEquals(Collections.singletonList("appId-internal-topic"), stateStoreNameToSourceTopic.get("store")); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddInternalTopicConfigWithCompactAndDeleteSetForWindowStores() throws Exception { + builder.setApplicationId("appId"); + builder.addSource(null, "source", null, null, null, "topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new RocksDBWindowStoreSupplier("store", 30000, 3, false, null, null, 10000, true, Collections.<String, String>emptyMap(), false), "processor"); + final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); + final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); + final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-store-changelog"); + final Properties properties = topicConfig.toProperties(0); + final List<String> policies = Arrays.asList(properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP).split(",")); + assertEquals("appId-store-changelog", topicConfig.name()); + assertTrue(policies.contains("compact")); + assertTrue(policies.contains("delete")); + assertEquals(2, policies.size()); + assertEquals("30000", properties.getProperty(InternalTopicManager.RETENTION_MS)); + assertEquals(2, properties.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddInternalTopicConfigWithCompactForNonWindowStores() throws Exception { + builder.setApplicationId("appId"); + builder.addSource(null, "source", null, null, null, "topic"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addStateStore(new MockStateStoreSupplier("name", true), "processor"); + final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); + final TopologyBuilder.TopicsInfo topicsInfo = topicGroups.values().iterator().next(); + final InternalTopicConfig topicConfig = topicsInfo.stateChangelogTopics.get("appId-name-changelog"); + final Properties properties = topicConfig.toProperties(0); + assertEquals("appId-name-changelog", topicConfig.name()); + assertEquals("compact", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP)); + assertEquals(1, properties.size()); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddInternalTopicConfigWithCleanupPolicyDeleteForInternalTopics() throws Exception { + builder.setApplicationId("appId"); + builder.addInternalTopic("foo"); + builder.addSource(null, "source", null, null, null, "foo"); + final TopologyBuilder.TopicsInfo topicsInfo = builder.topicGroups().values().iterator().next(); + final InternalTopicConfig topicConfig = topicsInfo.repartitionSourceTopics.get("appId-foo"); + final Properties properties = topicConfig.toProperties(0); + assertEquals("appId-foo", topicConfig.name()); + assertEquals("delete", properties.getProperty(InternalTopicManager.CLEANUP_POLICY_PROP)); + assertEquals(1, properties.size()); + } + + @Test(expected = TopologyBuilderException.class) + public void shouldThroughOnUnassignedStateStoreAccess() { + final String sourceNodeName = "source"; + final String goodNodeName = "goodGuy"; + final String badNodeName = "badGuy"; + + final Properties config = new Properties(); + config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "host:1"); + config.put(StreamsConfig.APPLICATION_ID_CONFIG, "appId"); + final StreamsConfig streamsConfig = new StreamsConfig(config); + + try { + builder.addSource(null, sourceNodeName, null, null, null, "topic"); + builder.addProcessor(goodNodeName, new LocalMockProcessorSupplier(), sourceNodeName); + builder.addStateStore( + Stores.create(LocalMockProcessorSupplier.STORE_NAME).withStringKeys().withStringValues().inMemory().build(), + goodNodeName); + builder.addProcessor(badNodeName, new LocalMockProcessorSupplier(), sourceNodeName); + + final ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(streamsConfig, builder); + driver.process("topic", null, null); + } catch (final StreamsException e) { + final Throwable cause = e.getCause(); + if (cause != null + && cause instanceof TopologyBuilderException + && cause.getMessage().equals("Invalid topology building: Processor " + badNodeName + " has no access to StateStore " + LocalMockProcessorSupplier.STORE_NAME)) { + throw (TopologyBuilderException) cause; + } else { + throw new RuntimeException("Did expect different exception. Did catch:", e); + } + } + } + + private static class LocalMockProcessorSupplier implements ProcessorSupplier { + final static String STORE_NAME = "store"; + + @Override + public Processor get() { + return new Processor() { + @Override + public void init(final ProcessorContext context) { + context.getStateStore(STORE_NAME); + } + + @Override + public void process(final Object key, final Object value) { } + + @Override + public void punctuate(final long timestamp) { } + + @Override + public void close() { + } + }; + } + } + + @SuppressWarnings("unchecked") + @Test + public void shouldSetCorrectSourceNodesWithRegexUpdatedTopics() throws Exception { + builder.addSource(null, "source-1", null, null, null, "topic-foo"); + builder.addSource(null, "source-2", null, null, null, Pattern.compile("topic-[A-C]")); + builder.addSource(null, "source-3", null, null, null, Pattern.compile("topic-\\d")); + + final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + updatedTopicsField.setAccessible(true); + + final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates); + + updatedTopics.add("topic-B"); + updatedTopics.add("topic-3"); + updatedTopics.add("topic-A"); + + builder.updateSubscriptions(subscriptionUpdates, null); + builder.setApplicationId("test-id"); + + final Map<Integer, TopologyBuilder.TopicsInfo> topicGroups = builder.topicGroups(); + assertTrue(topicGroups.get(0).sourceTopics.contains("topic-foo")); + assertTrue(topicGroups.get(1).sourceTopics.contains("topic-A")); + assertTrue(topicGroups.get(1).sourceTopics.contains("topic-B")); + assertTrue(topicGroups.get(2).sourceTopics.contains("topic-3")); + + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorPerSource() throws Exception { + builder.addSource(null, "source", new MockTimestampExtractor(), null, null, "topic"); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source("topic").getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @SuppressWarnings("unchecked") + @Test + public void shouldAddTimestampExtractorWithPatternPerSource() throws Exception { + final Pattern pattern = Pattern.compile("t.*"); + builder.addSource(null, "source", new MockTimestampExtractor(), null, null, pattern); + final ProcessorTopology processorTopology = builder.build(null); + assertThat(processorTopology.source(pattern.pattern()).getTimestampExtractor(), instanceOf(MockTimestampExtractor.class)); + } + + @Test + public void shouldConnectRegexMatchedTopicsToStateStore() throws Exception { + builder.addSource(null, "ingest", null, null, null, Pattern.compile("topic-\\d+")); + builder.addProcessor("my-processor", new MockProcessorSupplier(), "ingest"); + builder.addStateStore(new MockStateStoreSupplier("testStateStore", false), "my-processor"); + + final SubscriptionUpdates subscriptionUpdates = new SubscriptionUpdates(); + final Field updatedTopicsField = subscriptionUpdates.getClass().getDeclaredField("updatedTopicSubscriptions"); + updatedTopicsField.setAccessible(true); + + final Set<String> updatedTopics = (Set<String>) updatedTopicsField.get(subscriptionUpdates); + + updatedTopics.add("topic-2"); + updatedTopics.add("topic-3"); + updatedTopics.add("topic-A"); + + builder.updateSubscriptions(subscriptionUpdates, "test-thread"); + builder.setApplicationId("test-app"); + + final Map<String, List<String>> stateStoreAndTopics = builder.stateStoreNameToSourceTopics(); + final List<String> topics = stateStoreAndTopics.get("testStateStore"); + + assertTrue("Expected to contain two topics", topics.size() == 2); + + assertTrue(topics.contains("topic-2")); + assertTrue(topics.contains("topic-3")); + assertFalse(topics.contains("topic-A")); + } + + @Test(expected = TopologyBuilderException.class) + public void shouldNotAllowToAddGlobalStoreWithSourceNameEqualsProcessorName() { + final String sameNameForSourceAndProcessor = "sameName"; + builder.addGlobalStore( + new MockStateStoreSupplier("anyName", false, false), + sameNameForSourceAndProcessor, + null, + null, + null, + "anyTopicName", + sameNameForSourceAndProcessor, + new MockProcessorSupplier()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 369c47f..fd3afa8 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -121,9 +121,9 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingSimpleTopology() { + public void testDrivingSimpleTopology() throws Exception { int partition = 10; - driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition)); + driver = new ProcessorTopologyTestDriver(config, createSimpleTopology(partition).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); assertNoOutputRecord(OUTPUT_TOPIC_2); @@ -143,8 +143,8 @@ public class ProcessorTopologyTest { @Test - public void testDrivingMultiplexingTopology() { - driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology()); + public void testDrivingMultiplexingTopology() throws Exception { + driver = new ProcessorTopologyTestDriver(config, createMultiplexingTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); @@ -165,8 +165,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingMultiplexByNameTopology() { - driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology()); + public void testDrivingMultiplexByNameTopology() throws Exception { + driver = new ProcessorTopologyTestDriver(config, createMultiplexByNameTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1(1)"); assertNextOutputRecord(OUTPUT_TOPIC_2, "key1", "value1(2)"); @@ -187,9 +187,9 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingStatefulTopology() { + public void testDrivingStatefulTopology() throws Exception { String storeName = "entries"; - driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName)); + driver = new ProcessorTopologyTestDriver(config, createStatefulTopology(storeName).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); @@ -213,7 +213,7 @@ public class ProcessorTopologyTest { final TopologyBuilder topologyBuilder = this.builder .addGlobalStore(storeSupplier, global, STRING_DESERIALIZER, STRING_DESERIALIZER, topic, "processor", define(new StatefulProcessor("my-store"))); - driver = new ProcessorTopologyTestDriver(config, topologyBuilder); + driver = new ProcessorTopologyTestDriver(config, topologyBuilder.internalTopologyBuilder); final KeyValueStore<String, String> globalStore = (KeyValueStore<String, String>) topologyBuilder.globalStateStores().get("my-store"); driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); @@ -222,9 +222,9 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingSimpleMultiSourceTopology() { + public void testDrivingSimpleMultiSourceTopology() throws Exception { int partition = 10; - driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition)); + driver = new ProcessorTopologyTestDriver(config, createSimpleMultiSourceTopology(partition).internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); assertNextOutputRecord(OUTPUT_TOPIC_1, "key1", "value1", partition); @@ -236,8 +236,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingForwardToSourceTopology() { - driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology()); + public void testDrivingForwardToSourceTopology() throws Exception { + driver = new ProcessorTopologyTestDriver(config, createForwardToSourceTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); @@ -247,8 +247,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingInternalRepartitioningTopology() { - driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology()); + public void testDrivingInternalRepartitioningTopology() throws Exception { + driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key3", "value3", STRING_SERIALIZER, STRING_SERIALIZER); @@ -258,8 +258,8 @@ public class ProcessorTopologyTest { } @Test - public void testDrivingInternalRepartitioningForwardingTimestampTopology() { - driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology()); + public void testDrivingInternalRepartitioningForwardingTimestampTopology() throws Exception { + driver = new ProcessorTopologyTestDriver(config, createInternalRepartitioningWithValueTimestampTopology().internalTopologyBuilder); driver.process(INPUT_TOPIC_1, "key1", "value1@1000", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key2", "value2@2000", STRING_SERIALIZER, STRING_SERIALIZER); driver.process(INPUT_TOPIC_1, "key3", "value3@3000", STRING_SERIALIZER, STRING_SERIALIZER);
