Repository: kafka Updated Branches: refs/heads/trunk fc93fb4b6 -> 5d798511b
http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index a6d1179..0a08b7c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -68,20 +68,20 @@ import static org.junit.Assert.assertThat; public class StreamPartitionAssignorTest { - private TopicPartition t1p0 = new TopicPartition("topic1", 0); - private TopicPartition t1p1 = new TopicPartition("topic1", 1); - private TopicPartition t1p2 = new TopicPartition("topic1", 2); - private TopicPartition t2p0 = new TopicPartition("topic2", 0); - private TopicPartition t2p1 = new TopicPartition("topic2", 1); - private TopicPartition t2p2 = new TopicPartition("topic2", 2); - private TopicPartition t3p0 = new TopicPartition("topic3", 0); - private TopicPartition t3p1 = new TopicPartition("topic3", 1); - private TopicPartition t3p2 = new TopicPartition("topic3", 2); - private TopicPartition t3p3 = new TopicPartition("topic3", 3); - - private Set<String> allTopics = Utils.mkSet("topic1", "topic2"); - - private List<PartitionInfo> infos = Arrays.asList( + private final TopicPartition t1p0 = new TopicPartition("topic1", 0); + private final TopicPartition t1p1 = new TopicPartition("topic1", 1); + private final TopicPartition t1p2 = new TopicPartition("topic1", 2); + private final TopicPartition t2p0 = new TopicPartition("topic2", 0); + private final TopicPartition t2p1 = new TopicPartition("topic2", 1); + private final TopicPartition t2p2 = new TopicPartition("topic2", 2); + private final TopicPartition t3p0 = new TopicPartition("topic3", 0); + private final TopicPartition t3p1 = new TopicPartition("topic3", 1); + private final TopicPartition t3p2 = new TopicPartition("topic3", 2); + private final TopicPartition t3p3 = new TopicPartition("topic3", 3); + + private final Set<String> allTopics = Utils.mkSet("topic1", "topic2"); + + private final List<PartitionInfo> infos = Arrays.asList( new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), @@ -94,8 +94,11 @@ public class StreamPartitionAssignorTest { new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) ); - private Cluster metadata = new Cluster("cluster", Collections.singletonList(Node.noNode()), infos, Collections.<String>emptySet(), - Collections.<String>emptySet()); + private final Cluster metadata = new Cluster( + "cluster", + Collections.singletonList(Node.noNode()), + infos, Collections.<String>emptySet(), + Collections.<String>emptySet()); private final TaskId task0 = new TaskId(0, 0); private final TaskId task1 = new TaskId(0, 1); @@ -104,7 +107,7 @@ public class StreamPartitionAssignorTest { private final String userEndPoint = "localhost:2171"; private final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); private final MockClientSupplier mockClientSupplier = new MockClientSupplier(); - private final TopologyBuilder builder = new TopologyBuilder(); + private final InternalTopologyBuilder builder = new InternalTopologyBuilder(); private final StreamsConfig config = new StreamsConfig(configProps()); private final StreamThread mockStreamThread = new StreamThread(builder, config, mockClientSupplier, "appID", @@ -133,8 +136,8 @@ public class StreamPartitionAssignorTest { @Test public void testSubscription() throws Exception { - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); final Set<TaskId> prevTasks = Utils.mkSet( @@ -145,8 +148,19 @@ public class StreamPartitionAssignorTest { String clientId = "client-id"; UUID processId = UUID.randomUUID(); - StreamThread thread = new StreamThread(builder, config, new MockClientSupplier(), "test", clientId, processId, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0) { + StreamThread thread = new StreamThread( + builder, + config, + new MockClientSupplier(), + "test", + clientId, + processId, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, + StreamsMetadataState.UNKNOWN_HOST), + 0) { + @Override public Set<TaskId> prevActiveTasks() { return prevTasks; @@ -173,8 +187,8 @@ public class StreamPartitionAssignorTest { @Test public void testAssignBasic() throws Exception { - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); @@ -191,8 +205,17 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); @@ -245,10 +268,10 @@ public class StreamPartitionAssignorTest { props.put(StreamsConfig.PARTITION_GROUPER_CLASS_CONFIG, SingleGroupPartitionGrouperStub.class); StreamsConfig config = new StreamsConfig(props); - builder.addSource("source1", "topic1"); + builder.addSource(null, "source1", null, null, null, "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor2"); List<String> topics = Utils.mkList("topic1", "topic2"); @@ -257,7 +280,17 @@ public class StreamPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -281,8 +314,8 @@ public class StreamPartitionAssignorTest { @Test public void testAssignEmptyMetadata() throws Exception { - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); @@ -296,7 +329,17 @@ public class StreamPartitionAssignorTest { UUID uuid1 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, new MockClientSupplier(), "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + StreamThread thread10 = new StreamThread( + builder, + config, + new MockClientSupplier(), + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); @@ -338,9 +381,9 @@ public class StreamPartitionAssignorTest { @Test public void testAssignWithNewTasks() throws Exception { - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); - builder.addSource("source3", "topic3"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); + builder.addSource(null, "source3", null, null, null, "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); List<String> topics = Utils.mkList("topic1", "topic2", "topic3"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3); @@ -354,8 +397,17 @@ public class StreamPartitionAssignorTest { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -397,8 +449,8 @@ public class StreamPartitionAssignorTest { public void testAssignWithStates() throws Exception { String applicationId = "test"; builder.setApplicationId(applicationId); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1"); @@ -422,8 +474,17 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -488,8 +549,8 @@ public class StreamPartitionAssignorTest { props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); StreamsConfig config = new StreamsConfig(props); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); List<String> topics = Utils.mkList("topic1", "topic2"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); @@ -506,8 +567,17 @@ public class StreamPartitionAssignorTest { UUID uuid2 = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer)); @@ -559,16 +629,24 @@ public class StreamPartitionAssignorTest { public void testOnAssignment() throws Exception { TopicPartition t2p3 = new TopicPartition("topic2", 3); - TopologyBuilder builder = new TopologyBuilder(); - builder.addSource("source1", "topic1"); - builder.addSource("source2", "topic2"); + builder.addSource(null, "source1", null, null, null, "topic1"); + builder.addSource(null, "source2", null, null, null, "topic2"); builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); UUID uuid = UUID.randomUUID(); String client1 = "client1"; - StreamThread thread = new StreamThread(builder, config, mockClientSupplier, "test", client1, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread = new StreamThread( + builder, + config, + mockClientSupplier, + "test", + client1, + uuid, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); @@ -593,10 +671,10 @@ public class StreamPartitionAssignorTest { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); - builder.addSource("source1", "topic1"); + builder.addSource(null, "source1", null, null, null, "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); - builder.addSink("sink1", "topicX", "processor1"); - builder.addSource("source2", "topicX"); + builder.addSink("sink1", "topicX", null, null, null, "processor1"); + builder.addSource(null, "source2", null, null, null, "topicX"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); List<String> topics = Utils.mkList("topic1", "test-topicX"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); @@ -605,8 +683,17 @@ public class StreamPartitionAssignorTest { String client1 = "client1"; - StreamThread thread10 = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + StreamThread thread10 = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(thread10, applicationId, client1)); MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(thread10.config, mockClientSupplier.restoreConsumer); @@ -629,14 +716,14 @@ public class StreamPartitionAssignorTest { String applicationId = "test"; builder.setApplicationId(applicationId); builder.addInternalTopic("topicX"); - builder.addSource("source1", "topic1"); + builder.addSource(null, "source1", null, null, null, "topic1"); builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); - builder.addSink("sink1", "topicX", "processor1"); - builder.addSource("source2", "topicX"); + builder.addSink("sink1", "topicX", null, null, null, "processor1"); + builder.addSource(null, "source2", null, null, null, "topicX"); builder.addInternalTopic("topicZ"); builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); - builder.addSink("sink2", "topicZ", "processor2"); - builder.addSource("source3", "topicZ"); + builder.addSink("sink2", "topicZ", null, null, null, "processor2"); + builder.addSource(null, "source3", null, null, null, "topicZ"); List<String> topics = Utils.mkList("topic1", "test-topicX", "test-topicZ"); Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); @@ -669,9 +756,9 @@ public class StreamPartitionAssignorTest { final StreamsConfig config = new StreamsConfig(properties); final String applicationId = "application-id"; builder.setApplicationId(applicationId); - builder.addSource("source", "input"); + builder.addSource(null, "source", null, null, null, "input"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addSink("sink", "output", "processor"); + builder.addSink("sink", "output", null, null, null, "processor"); final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; @@ -693,17 +780,26 @@ public class StreamPartitionAssignorTest { final StreamsConfig config = new StreamsConfig(properties); final String applicationId = "application-id"; builder.setApplicationId(applicationId); - builder.addSource("source", "topic1"); + builder.addSource(null, "source", null, null, null, "topic1"); builder.addProcessor("processor", new MockProcessorSupplier(), "source"); - builder.addSink("sink", "output", "processor"); + builder.addSink("sink", "output", null, null, null, "processor"); final List<String> topics = Utils.mkList("topic1"); final UUID uuid1 = UUID.randomUUID(); final String client1 = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); @@ -734,9 +830,17 @@ public class StreamPartitionAssignorTest { final String applicationId = "application-id"; builder.setApplicationId(applicationId); - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, - new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); @@ -760,9 +864,17 @@ public class StreamPartitionAssignorTest { builder.setApplicationId(applicationId); - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client1, uuid1, - new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), - 0); + final StreamThread streamThread = new StreamThread( + builder, + config, + mockClientSupplier, + applicationId, + client1, + uuid1, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + 0); try { partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client1)); @@ -874,10 +986,22 @@ public class StreamPartitionAssignorTest { final UUID uuid = UUID.randomUUID(); final String client = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread streamThread = new StreamThread( + builder.internalTopologyBuilder, + config, + mockClientSupplier, + applicationId, + client, + uuid, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); - final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer); + final MockInternalTopicManager mockInternalTopicManager = new MockInternalTopicManager( + streamThread.config, + mockClientSupplier.restoreConsumer); partitionAssignor.setInternalTopicManager(mockInternalTopicManager); final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); @@ -953,10 +1077,22 @@ public class StreamPartitionAssignorTest { final UUID uuid = UUID.randomUUID(); final String client = "client1"; - final StreamThread streamThread = new StreamThread(builder, config, mockClientSupplier, applicationId, client, uuid, new Metrics(), Time.SYSTEM, new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0); + final StreamThread streamThread = new StreamThread( + builder.internalTopologyBuilder, + config, + mockClientSupplier, + applicationId, + client, + uuid, + new Metrics(), + Time.SYSTEM, + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), + 0); partitionAssignor.configure(config.getConsumerConfigs(streamThread, applicationId, client)); - partitionAssignor.setInternalTopicManager(new MockInternalTopicManager(streamThread.config, mockClientSupplier.restoreConsumer)); + partitionAssignor.setInternalTopicManager(new MockInternalTopicManager( + streamThread.config, + mockClientSupplier.restoreConsumer)); final Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); final Set<TaskId> emptyTasks = Collections.emptySet(); @@ -1046,9 +1182,10 @@ public class StreamPartitionAssignorTest { } } - if (info.standbyTasks.size() > 0) + if (info.standbyTasks.size() > 0) { // check if standby partitions cover all topics assertEquals(expectedTopics, standbyTopics); + } return info; } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a7f1db1..9b9d6cd 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -36,7 +36,6 @@ import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.TaskId; -import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; import org.apache.kafka.streams.state.HostInfo; import org.apache.kafka.streams.state.Stores; @@ -98,6 +97,7 @@ public class StreamThreadTest { @Before public void setUp() throws Exception { processId = UUID.randomUUID(); + builder.setApplicationId(applicationId); } private final TopicPartition t1p1 = new TopicPartition("topic1", 1); @@ -442,7 +442,7 @@ public class StreamThreadTest { public void testStateChangeStartClose() throws InterruptedException { final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -450,7 +450,7 @@ public class StreamThreadTest { processId, metrics, Time.SYSTEM, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final StateListenerStub stateListener = new StateListenerStub(); @@ -494,7 +494,7 @@ public class StreamThreadTest { //clientSupplier.consumer.assign(Arrays.asList(new TopicPartition(TOPIC, 0), new TopicPartition(TOPIC, 1))); final StreamThread thread1 = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -502,10 +502,10 @@ public class StreamThreadTest { processId, metrics, Time.SYSTEM, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final StreamThread thread2 = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -513,7 +513,7 @@ public class StreamThreadTest { processId, metrics, Time.SYSTEM, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment); @@ -617,7 +617,7 @@ public class StreamThreadTest { @Test public void testMetrics() { final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -625,7 +625,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final String defaultGroupName = "stream-metrics"; final String defaultPrefix = "thread." + thread.threadClientId(); @@ -678,7 +678,7 @@ public class StreamThreadTest { extraDir.mkdir(); builder.addSource("source1", "topic1"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -686,7 +686,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override public void maybeClean(final long now) { @@ -815,7 +815,7 @@ public class StreamThreadTest { builder.addSource("source1", "topic1"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -823,7 +823,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -907,7 +907,7 @@ public class StreamThreadTest { builder.addSource("source1", "someTopic"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -915,7 +915,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); @@ -943,7 +943,7 @@ public class StreamThreadTest { final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, new StreamsConfig(configProps(true)), clientSupplier, applicationId, @@ -951,7 +951,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); @@ -982,7 +982,7 @@ public class StreamThreadTest { final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, new StreamsConfig(configProps(true)), clientSupplier, applicationId, @@ -990,7 +990,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); @@ -1015,7 +1015,7 @@ public class StreamThreadTest { builder.addSource("source1", "someTopic"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1023,7 +1023,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); @@ -1046,7 +1046,7 @@ public class StreamThreadTest { builder.addSource("name", "topic").addSink("out", "output"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1054,7 +1054,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); thread.setPartitionAssignor(new StreamPartitionAssignor() { @@ -1086,7 +1086,7 @@ public class StreamThreadTest { new StateDirectory(applicationId, config.getString(StreamsConfig.STATE_DIR_CONFIG), mockTime)); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1094,7 +1094,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -1133,13 +1133,11 @@ public class StreamThreadTest { @Test public void shouldInitializeRestoreConsumerWithOffsetsFromStandbyTasks() { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1147,7 +1145,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; @@ -1190,18 +1188,13 @@ public class StreamThreadTest { new TopicPartition("stream-thread-test-count-two-changelog", 0)))); } - - - @Test public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream("t1").groupByKey().count("count-one"); builder.stream("t2").groupByKey().count("count-two"); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1209,7 +1202,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", @@ -1268,14 +1261,12 @@ public class StreamThreadTest { @Test public void shouldCloseActiveTasksThatAreAssignedToThisStreamThreadButAssignmentHasChangedBeforeCreatingNewTasks() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream(Pattern.compile("t.*")).to("out"); final Map<Collection<TopicPartition>, TestStreamTask> createdTasks = new HashMap<>(); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1283,7 +1274,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -1355,7 +1346,7 @@ public class StreamThreadTest { final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, new StreamsConfig(configProps(true)), clientSupplier, applicationId, @@ -1363,7 +1354,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final MockConsumer consumer = clientSupplier.consumer; @@ -1451,7 +1442,7 @@ public class StreamThreadTest { final MockClientSupplier clientSupplier = new MockClientSupplier(applicationId); final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, new StreamsConfig(configProps(true)), clientSupplier, applicationId, @@ -1459,7 +1450,7 @@ public class StreamThreadTest { processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); @@ -1484,8 +1475,6 @@ public class StreamThreadTest { @Test public void shouldNotViolateAtLeastOnceWhenAnExceptionOccursOnTaskCloseDuringShutdown() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream("t1").groupByKey(); final TestStreamTask testStreamTask = new TestStreamTask( @@ -1507,7 +1496,7 @@ public class StreamThreadTest { }; final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1515,7 +1504,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -1561,7 +1550,7 @@ public class StreamThreadTest { }; final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1569,7 +1558,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -1606,8 +1595,6 @@ public class StreamThreadTest { @Test public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringTaskSuspension() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream("t1").groupByKey(); final TestStreamTask testStreamTask = new TestStreamTask( @@ -1629,7 +1616,7 @@ public class StreamThreadTest { }; final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1637,7 +1624,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override @@ -1667,8 +1654,6 @@ public class StreamThreadTest { @Test public void shouldNotViolateAtLeastOnceWhenExceptionOccursDuringFlushStateWhileSuspendingState() throws Exception { - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream("t1").groupByKey(); final TestStreamTask testStreamTask = new TestStreamTask( @@ -1690,7 +1675,7 @@ public class StreamThreadTest { }; final StreamThread thread = new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1698,7 +1683,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -1729,12 +1714,11 @@ public class StreamThreadTest { @Test @SuppressWarnings("unchecked") public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception { - final TopologyBuilder topologyBuilder = new TopologyBuilder(); - topologyBuilder.addSource("source", Pattern.compile("t.*")); - topologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source"); + builder.addSource("source", Pattern.compile("t.*")); + builder.addProcessor("processor", new MockProcessorSupplier(), "source"); final StreamThread thread = new StreamThread( - topologyBuilder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -1742,7 +1726,7 @@ public class StreamThreadTest { processId, metrics, mockTime, - new StreamsMetadataState(topologyBuilder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0); final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); @@ -1756,11 +1740,11 @@ public class StreamThreadTest { final Field nodeToSourceTopicsField = - topologyBuilder.getClass().getDeclaredField("nodeToSourceTopics"); + builder.internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics"); nodeToSourceTopicsField.setAccessible(true); final Map<String, List<String>> nodeToSourceTopics = - (Map<String, List<String>>) nodeToSourceTopicsField.get(topologyBuilder); + (Map<String, List<String>>) nodeToSourceTopicsField.get(builder.internalTopologyBuilder); final List<TopicPartition> topicPartitions = new ArrayList<>(); final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0); @@ -1856,8 +1840,6 @@ public class StreamThreadTest { } private StreamThread setupTest(final TaskId taskId) throws InterruptedException { - final TopologyBuilder builder = new TopologyBuilder(); - builder.setApplicationId(applicationId); builder.addSource("source", "topic"); final MockClientSupplier clientSupplier = new MockClientSupplier(); @@ -1883,9 +1865,18 @@ public class StreamThreadTest { } }; - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder.internalTopologyBuilder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), + 0) { + @Override protected StreamTask createStreamTask(final TaskId id, final Collection<TopicPartition> partitions) { return testStreamTask; @@ -1968,8 +1959,6 @@ public class StreamThreadTest { final String storeName = "store"; final String changelogTopic = applicationId + "-" + storeName + "-changelog"; - final KStreamBuilder builder = new KStreamBuilder(); - builder.setApplicationId(applicationId); builder.stream("topic1").groupByKey().count(storeName); final MockClientSupplier clientSupplier = new MockClientSupplier(); @@ -1986,9 +1975,17 @@ public class StreamThreadTest { } }); - final StreamThread thread = new StreamThread(builder, config, clientSupplier, applicationId, - clientId, processId, new Metrics(), new MockTime(), - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), 0) { + final StreamThread thread = new StreamThread( + builder.internalTopologyBuilder, + config, + clientSupplier, + applicationId, + clientId, + processId, + new Metrics(), + new MockTime(), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), + 0) { @Override protected StandbyTask createStandbyTask(final TaskId id, final Collection<TopicPartition> partitions) { @@ -2060,7 +2057,7 @@ public class StreamThreadTest { private StreamThread getStreamThread() { return new StreamThread( - builder, + builder.internalTopologyBuilder, config, clientSupplier, applicationId, @@ -2068,7 +2065,7 @@ public class StreamThreadTest { processId, metrics, Time.SYSTEM, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java index c8ab6f1..8ee1d6e 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsMetadataStateTest.java @@ -115,7 +115,7 @@ public class StreamsMetadataStateTest { new PartitionInfo("topic-four", 0, null, null, null)); cluster = new Cluster(null, Collections.<Node>emptyList(), partitionInfos, Collections.<String>emptySet(), Collections.<String>emptySet()); - discovery = new StreamsMetadataState(builder, hostOne); + discovery = new StreamsMetadataState(builder.internalTopologyBuilder, hostOne); discovery.onChange(hostToPartitions, cluster); partitioner = new StreamPartitioner<String, Object>() { @Override @@ -127,7 +127,7 @@ public class StreamsMetadataStateTest { @Test public void shouldNotThrowNPEWhenOnChangeNotCalled() throws Exception { - new StreamsMetadataState(builder, hostOne).getAllMetadataForStore("store"); + new StreamsMetadataState(builder.internalTopologyBuilder, hostOne).getAllMetadataForStore("store"); } @Test @@ -294,7 +294,7 @@ public class StreamsMetadataStateTest { @Test public void shouldGetAnyHostForGlobalStoreByKeyIfMyHostUnknown() throws Exception { - final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST); streamsMetadataState.onChange(hostToPartitions, cluster); assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", Serdes.String().serializer())); } @@ -307,7 +307,7 @@ public class StreamsMetadataStateTest { @Test public void shouldGetAnyHostForGlobalStoreByKeyAndPartitionerIfMyHostUnknown() throws Exception { - final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST); + final StreamsMetadataState streamsMetadataState = new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST); streamsMetadataState.onChange(hostToPartitions, cluster); assertNotNull(streamsMetadataState.getMetadataWithKey(globalTable, "key", partitioner)); } http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java index bf55b47..d2f236f 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/StreamThreadStateStoreProviderTest.java @@ -113,7 +113,7 @@ public class StreamThreadStateStoreProviderTest { storesAvailable = true; provider = new StreamThreadStateStoreProvider( new StreamThread( - builder, + builder.internalTopologyBuilder, streamsConfig, clientSupplier, applicationId, @@ -121,7 +121,7 @@ public class StreamThreadStateStoreProviderTest { UUID.randomUUID(), new Metrics(), Time.SYSTEM, - new StreamsMetadataState(builder, StreamsMetadataState.UNKNOWN_HOST), + new StreamsMetadataState(builder.internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST), 0) { @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/5d798511/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index c59113e..d026c60 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -41,6 +41,7 @@ import org.apache.kafka.streams.processor.internals.GlobalProcessorContextImpl; import org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl; import org.apache.kafka.streams.processor.internals.GlobalStateUpdateTask; import org.apache.kafka.streams.processor.internals.InternalProcessorContext; +import org.apache.kafka.streams.processor.internals.InternalTopologyBuilder; import org.apache.kafka.streams.processor.internals.MockStreamsMetrics; import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; import org.apache.kafka.streams.processor.internals.ProcessorRecordContext; @@ -160,7 +161,7 @@ public class ProcessorTopologyTestDriver { * @param builder the topology builder that will be used to create the topology instance */ public ProcessorTopologyTestDriver(final StreamsConfig config, - final TopologyBuilder builder) { + final InternalTopologyBuilder builder) { topology = builder.setApplicationId(APPLICATION_ID).build(null); final ProcessorTopology globalTopology = builder.buildGlobalStateTopology(); @@ -351,7 +352,7 @@ public class ProcessorTopologyTestDriver { /** * Get the {@link StateStore} with the given name. The name should have been supplied via - * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is + * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is * presumed to be used by a Processor within the topology. * <p> * This is often useful in test cases to pre-populate the store before the test case instructs the topology to @@ -367,7 +368,7 @@ public class ProcessorTopologyTestDriver { /** * Get the {@link KeyValueStore} with the given name. The name should have been supplied via - * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder) this object's constructor}, and is + * {@link #ProcessorTopologyTestDriver(StreamsConfig, InternalTopologyBuilder) this object's constructor}, and is * presumed to be used by a Processor within the topology. * <p> * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
