Repository: kafka Updated Branches: refs/heads/trunk 8f6a372ee -> 5df1eee7d
http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index a3d7523..c3a372c 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -20,13 +20,13 @@ import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.MockConsumer; -import org.apache.kafka.clients.consumer.internals.PartitionAssignor; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.utils.LogContext; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.StreamsConfig; @@ -37,11 +37,7 @@ import org.apache.kafka.streams.kstream.internals.InternalStreamsBuilderTest; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TaskMetadata; import org.apache.kafka.streams.processor.ThreadMetadata; -import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; -import org.apache.kafka.streams.state.HostInfo; -import org.apache.kafka.streams.state.Stores; import org.apache.kafka.test.MockClientSupplier; -import org.apache.kafka.test.MockProcessorSupplier; import org.apache.kafka.test.MockStateRestoreListener; import org.apache.kafka.test.MockTimestampExtractor; import org.apache.kafka.test.TestCondition; @@ -51,21 +47,16 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import java.lang.reflect.Field; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.Set; import java.util.UUID; -import java.util.regex.Pattern; -import static java.util.Collections.EMPTY_SET; import static org.hamcrest.CoreMatchers.equalTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -100,18 +91,14 @@ public class StreamThreadTest { streamsMetadataState = new StreamsMetadataState(internalTopologyBuilder, StreamsMetadataState.UNKNOWN_HOST); } - private final TopicPartition t1p1 = new TopicPartition("topic1", 1); - private final TopicPartition t1p2 = new TopicPartition("topic1", 2); - private final TopicPartition t2p1 = new TopicPartition("topic2", 1); - private final TopicPartition t2p2 = new TopicPartition("topic2", 2); - private final TopicPartition t3p1 = new TopicPartition("topic3", 1); - private final TopicPartition t3p2 = new TopicPartition("topic3", 2); + private final String topic1 = "topic1"; + + private final TopicPartition t1p1 = new TopicPartition(topic1, 1); + private final TopicPartition t1p2 = new TopicPartition(topic1, 2); // task0 is unused private final TaskId task1 = new TaskId(0, 1); private final TaskId task2 = new TaskId(0, 2); - private final TaskId task3 = new TaskId(1, 1); - private final TaskId task4 = new TaskId(1, 2); private Properties configProps(final boolean enableEos) { return new Properties() { @@ -132,30 +119,19 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test public void testPartitionAssignmentChangeForSingleGroup() { - internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1"); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = getStreamThread(); - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - thread.setThreadMetadataProvider(new StreamPartitionAssignor() { - @Override - public Map<TaskId, Set<TopicPartition>> activeTasks() { - return activeTasks; - } - }); - final StateListenerStub stateListener = new StateListenerStub(); thread.setStateListener(stateListener); assertEquals(thread.state(), StreamThread.State.CREATED); final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; thread.setState(StreamThread.State.RUNNING); - assertTrue(thread.tasks().isEmpty()); List<TopicPartition> revokedPartitions; List<TopicPartition> assignedPartitions; - Set<TopicPartition> expectedGroup1; - Set<TopicPartition> expectedGroup2; // revoke nothing revokedPartitions = Collections.emptyList(); @@ -165,59 +141,12 @@ public class StreamThreadTest { // assign single partition assignedPartitions = Collections.singletonList(t1p1); - expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); - activeTasks.put(new TaskId(0, 1), expectedGroup1); + thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), Collections.<TaskId, Set<TopicPartition>>emptyMap()); rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.runOnce(-1); assertEquals(thread.state(), StreamThread.State.RUNNING); Assert.assertEquals(4, stateListener.numChanges); Assert.assertEquals(StreamThread.State.PARTITIONS_ASSIGNED, stateListener.oldState); - assertTrue(thread.tasks().containsKey(task1)); - assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); - assertEquals(1, thread.tasks().size()); - - // revoke single partition - revokedPartitions = assignedPartitions; - activeTasks.clear(); - rebalanceListener.onPartitionsRevoked(revokedPartitions); - - assertFalse(thread.tasks().containsKey(task1)); - assertEquals(0, thread.tasks().size()); - - // assign different single partition - assignedPartitions = Collections.singletonList(t1p2); - expectedGroup2 = new HashSet<>(Collections.singleton(t1p2)); - activeTasks.put(new TaskId(0, 2), expectedGroup2); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - assertTrue(thread.tasks().containsKey(task2)); - assertEquals(expectedGroup2, thread.tasks().get(task2).partitions()); - assertEquals(1, thread.tasks().size()); - - // revoke different single partition and assign both partitions - revokedPartitions = assignedPartitions; - activeTasks.clear(); - rebalanceListener.onPartitionsRevoked(revokedPartitions); - assignedPartitions = Arrays.asList(t1p1, t1p2); - expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); - expectedGroup2 = new HashSet<>(Collections.singleton(t1p2)); - activeTasks.put(new TaskId(0, 1), expectedGroup1); - activeTasks.put(new TaskId(0, 2), expectedGroup2); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - assertTrue(thread.tasks().containsKey(task1)); - assertTrue(thread.tasks().containsKey(task2)); - assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task2).partitions()); - assertEquals(2, thread.tasks().size()); - - // revoke all partitions and assign nothing - revokedPartitions = assignedPartitions; - rebalanceListener.onPartitionsRevoked(revokedPartitions); - assignedPartitions = Collections.emptyList(); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - assertTrue(thread.tasks().isEmpty()); thread.shutdown(); assertTrue(thread.state() == StreamThread.State.PENDING_SHUTDOWN); @@ -225,104 +154,6 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test - public void testPartitionAssignmentChangeForMultipleGroups() { - internalTopologyBuilder.addSource(null, "source1", null, null, null, "topic1"); - internalTopologyBuilder.addSource(null, "source2", null, null, null, "topic2"); - internalTopologyBuilder.addSource(null, "source3", null, null, null, "topic3"); - internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - - final StreamThread thread = getStreamThread(); - - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - thread.setThreadMetadataProvider(new StreamPartitionAssignor() { - @Override - public Map<TaskId, Set<TopicPartition>> activeTasks() { - return activeTasks; - } - }); - - final StateListenerStub stateListener = new StateListenerStub(); - thread.setStateListener(stateListener); - assertEquals(thread.state(), StreamThread.State.CREATED); - - final ConsumerRebalanceListener rebalanceListener = thread.rebalanceListener; - thread.setState(StreamThread.State.RUNNING); - assertTrue(thread.tasks().isEmpty()); - - List<TopicPartition> revokedPartitions; - List<TopicPartition> assignedPartitions; - Set<TopicPartition> expectedGroup1; - Set<TopicPartition> expectedGroup2; - - // revoke nothing - revokedPartitions = Collections.emptyList(); - rebalanceListener.onPartitionsRevoked(revokedPartitions); - - assertEquals(thread.state(), StreamThread.State.PARTITIONS_REVOKED); - - // assign four new partitions of second subtopology - assignedPartitions = Arrays.asList(t2p1, t2p2, t3p1, t3p2); - expectedGroup1 = new HashSet<>(Arrays.asList(t2p1, t3p1)); - expectedGroup2 = new HashSet<>(Arrays.asList(t2p2, t3p2)); - activeTasks.put(new TaskId(1, 1), expectedGroup1); - activeTasks.put(new TaskId(1, 2), expectedGroup2); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - - assertTrue(thread.tasks().containsKey(task3)); - assertTrue(thread.tasks().containsKey(task4)); - assertEquals(expectedGroup1, thread.tasks().get(task3).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task4).partitions()); - assertEquals(2, thread.tasks().size()); - - // revoke four partitions and assign three partitions of both subtopologies - revokedPartitions = assignedPartitions; - rebalanceListener.onPartitionsRevoked(revokedPartitions); - - assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1); - expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); - expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1)); - activeTasks.put(new TaskId(0, 1), expectedGroup1); - activeTasks.put(new TaskId(1, 1), expectedGroup2); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - - assertTrue(thread.tasks().containsKey(task1)); - assertTrue(thread.tasks().containsKey(task3)); - assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task3).partitions()); - assertEquals(2, thread.tasks().size()); - - // revoke all three partitons and reassign the same three partitions (from different subtopologies) - revokedPartitions = assignedPartitions; - rebalanceListener.onPartitionsRevoked(revokedPartitions); - assignedPartitions = Arrays.asList(t1p1, t2p1, t3p1); - expectedGroup1 = new HashSet<>(Collections.singleton(t1p1)); - expectedGroup2 = new HashSet<>(Arrays.asList(t2p1, t3p1)); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - - assertTrue(thread.tasks().containsKey(task1)); - assertTrue(thread.tasks().containsKey(task3)); - assertEquals(expectedGroup1, thread.tasks().get(task1).partitions()); - assertEquals(expectedGroup2, thread.tasks().get(task3).partitions()); - assertEquals(2, thread.tasks().size()); - - // revoke all partitions and assign nothing - revokedPartitions = assignedPartitions; - rebalanceListener.onPartitionsRevoked(revokedPartitions); - assignedPartitions = Collections.emptyList(); - rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.runOnce(-1); - - assertTrue(thread.tasks().isEmpty()); - - thread.shutdown(); - assertEquals(thread.state(), StreamThread.State.PENDING_SHUTDOWN); - } - - @SuppressWarnings("unchecked") - @Test public void testStateChangeStartClose() throws InterruptedException { final StreamThread thread = createStreamThread(clientId, config, false); @@ -367,128 +198,12 @@ public class StreamThreadTest { new MockStateRestoreListener()); } - private final static String TOPIC = "topic"; - private final Set<TopicPartition> task0Assignment = Collections.singleton(new TopicPartition(TOPIC, 0)); - private final Set<TopicPartition> task1Assignment = Collections.singleton(new TopicPartition(TOPIC, 1)); - - @SuppressWarnings("unchecked") - @Test - public void testHandingOverTaskFromOneToAnotherThread() throws InterruptedException { - internalTopologyBuilder.addStateStore( - Stores - .create("store") - .withByteArrayKeys() - .withByteArrayValues() - .persistent() - .build() - ); - internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC); - - TopicPartition tp0 = new TopicPartition(TOPIC, 0); - TopicPartition tp1 = new TopicPartition(TOPIC, 1); - clientSupplier.consumer.assign(Arrays.asList(tp0, tp1)); - final Map<TopicPartition, Long> offsets = new HashMap<>(); - offsets.put(tp0, 0L); - offsets.put(tp1, 0L); - clientSupplier.consumer.updateBeginningOffsets(offsets); - - final StreamThread thread1 = createStreamThread(clientId + 1, config, false); - final StreamThread thread2 = createStreamThread(clientId + 2, config, false); - - - final Map<TaskId, Set<TopicPartition>> task0 = Collections.singletonMap(new TaskId(0, 0), task0Assignment); - final Map<TaskId, Set<TopicPartition>> task1 = Collections.singletonMap(new TaskId(0, 1), task1Assignment); - - final Map<TaskId, Set<TopicPartition>> thread1Assignment = new HashMap<>(task0); - final Map<TaskId, Set<TopicPartition>> thread2Assignment = new HashMap<>(task1); - - thread1.setThreadMetadataProvider(new MockStreamsPartitionAssignor(thread1Assignment)); - thread2.setThreadMetadataProvider(new MockStreamsPartitionAssignor(thread2Assignment)); - - // revoke (to get threads in correct state) - thread1.setState(StreamThread.State.RUNNING); - thread2.setState(StreamThread.State.RUNNING); - thread1.rebalanceListener.onPartitionsRevoked(EMPTY_SET); - thread2.rebalanceListener.onPartitionsRevoked(EMPTY_SET); - - // assign - thread1.rebalanceListener.onPartitionsAssigned(task0Assignment); - thread1.runOnce(-1); - thread2.rebalanceListener.onPartitionsAssigned(task1Assignment); - thread2.runOnce(-1); - - final Set<TaskId> originalTaskAssignmentThread1 = new HashSet<>(); - originalTaskAssignmentThread1.addAll(thread1.tasks().keySet()); - final Set<TaskId> originalTaskAssignmentThread2 = new HashSet<>(); - originalTaskAssignmentThread2.addAll(thread2.tasks().keySet()); - - // revoke (task will be suspended) - thread1.rebalanceListener.onPartitionsRevoked(task0Assignment); - thread2.rebalanceListener.onPartitionsRevoked(task1Assignment); - - assertThat(thread1.prevActiveTasks(), equalTo(originalTaskAssignmentThread1)); - assertThat(thread2.prevActiveTasks(), equalTo(originalTaskAssignmentThread2)); - - // assign reverted - thread1Assignment.clear(); - thread1Assignment.putAll(task1); - - thread2Assignment.clear(); - thread2Assignment.putAll(task0); - - final Thread runIt = new Thread(new Runnable() { - @Override - public void run() { - thread1.rebalanceListener.onPartitionsAssigned(task1Assignment); - thread1.runOnce(-1); - } - }); - runIt.start(); - - thread2.rebalanceListener.onPartitionsAssigned(task0Assignment); - thread2.runOnce(-1); - - runIt.join(); - - assertThat(thread1.tasks().keySet(), equalTo(originalTaskAssignmentThread2)); - assertThat(thread2.tasks().keySet(), equalTo(originalTaskAssignmentThread1)); - } - - private class MockStreamsPartitionAssignor extends StreamPartitionAssignor { - - private final Map<TaskId, Set<TopicPartition>> activeTaskAssignment; - private final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment; - - MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment) { - this(activeTaskAssignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); - } - - MockStreamsPartitionAssignor(final Map<TaskId, Set<TopicPartition>> activeTaskAssignment, - final Map<TaskId, Set<TopicPartition>> standbyTaskAssignment) { - this.activeTaskAssignment = activeTaskAssignment; - this.standbyTaskAssignment = standbyTaskAssignment; - } - - @Override - public Map<TaskId, Set<TopicPartition>> activeTasks() { - return activeTaskAssignment; - } - - @Override - public Map<TaskId, Set<TopicPartition>> standbyTasks() { - return standbyTaskAssignment; - } - - @Override - public void close() {} - } - @Test public void testMetrics() { final StreamThread thread = createStreamThread(clientId, config, false); final String defaultGroupName = "stream-metrics"; - final String defaultPrefix = "thread." + thread.threadClientId(); - final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.threadClientId()); + final String defaultPrefix = "thread." + thread.getName(); + final Map<String, String> defaultTags = Collections.singletonMap("client-id", thread.getName()); assertNotNull(metrics.getSensor(defaultPrefix + ".commit-latency")); assertNotNull(metrics.getSensor(defaultPrefix + ".poll-latency")); @@ -529,19 +244,18 @@ public class StreamThreadTest { final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 1); StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap()); - final StreamThread thread = new StreamThread(internalTopologyBuilder, - clientId, - "", - config, - processId, - mockTime, - streamsMetadataState, - taskManager, - streamsMetrics, - clientSupplier, - consumer, - clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), - stateDirectory); + final StreamThread thread = new StreamThread(mockTime, + config, + consumer, + consumer, + null, + clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext("") + ); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval - 10L); thread.maybeCommit(mockTime.milliseconds()); @@ -562,19 +276,17 @@ public class StreamThreadTest { final TaskManager taskManager = mockTaskManagerCommit(consumer, 1, 0); StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap()); - final StreamThread thread = new StreamThread(internalTopologyBuilder, - clientId, - "", - config, - processId, - mockTime, - streamsMetadataState, - taskManager, - streamsMetrics, - clientSupplier, - consumer, - clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), - stateDirectory); + final StreamThread thread = new StreamThread(mockTime, + config, + consumer, + consumer, + null, + clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext("")); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval - 10L); thread.maybeCommit(mockTime.milliseconds()); @@ -596,19 +308,17 @@ public class StreamThreadTest { final TaskManager taskManager = mockTaskManagerCommit(consumer, 2, 1); StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap()); - final StreamThread thread = new StreamThread(internalTopologyBuilder, - clientId, - "", - config, - processId, - mockTime, - streamsMetadataState, - taskManager, - streamsMetrics, - clientSupplier, - consumer, - clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), - stateDirectory); + final StreamThread thread = new StreamThread(mockTime, + config, + consumer, + consumer, + null, + clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext("")); thread.maybeCommit(mockTime.milliseconds()); mockTime.sleep(commitInterval + 1); thread.maybeCommit(mockTime.milliseconds()); @@ -619,8 +329,6 @@ public class StreamThreadTest { @SuppressWarnings({"ThrowableNotThrown", "unchecked"}) private TaskManager mockTaskManagerCommit(final Consumer<byte[], byte[]> consumer, final int numberOfCommits, final int commits) { final TaskManager taskManager = EasyMock.createMock(TaskManager.class); - taskManager.setConsumer(EasyMock.anyObject(Consumer.class)); - EasyMock.expectLastCall(); EasyMock.expect(taskManager.commitAll()).andReturn(commits).times(numberOfCommits); EasyMock.replay(taskManager, consumer); return taskManager; @@ -628,18 +336,26 @@ public class StreamThreadTest { @Test public void shouldInjectSharedProducerForAllTasksUsingClientSupplierOnCreateIfEosDisabled() throws InterruptedException { - internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic"); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, config, false); - final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); - assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); - assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(assignment)); - thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + // assign single partition + assignedPartitions.add(t1p1); + assignedPartitions.add(t1p2); + activeTasks.put(task1, Collections.singleton(t1p1)); + activeTasks.put(task2, Collections.singleton(t1p2)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + thread.taskManager().createTasks(assignedPartitions); + + thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); assertEquals(1, clientSupplier.producers.size()); final Producer globalProducer = clientSupplier.producers.get(0); @@ -652,46 +368,55 @@ public class StreamThreadTest { @Test public void shouldInjectProducerPerTaskUsingClientSupplierOnCreateIfEosEnable() throws InterruptedException { - internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic"); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); - assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); - assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); - assignment.put(new TaskId(0, 2), Collections.singleton(new TopicPartition("someTopic", 2))); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(assignment)); - - final Set<TopicPartition> assignedPartitions = new HashSet<>(); - Collections.addAll(assignedPartitions, new TopicPartition("someTopic", 0), new TopicPartition("someTopic", 2)); thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + // assign single partition + assignedPartitions.add(t1p1); + assignedPartitions.add(t1p2); + activeTasks.put(task1, Collections.singleton(t1p1)); + activeTasks.put(task2, Collections.singleton(t1p2)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + + thread.rebalanceListener.onPartitionsAssigned(new HashSet<>(assignedPartitions)); + thread.runOnce(-1); assertEquals(thread.tasks().size(), clientSupplier.producers.size()); - final Iterator it = clientSupplier.producers.iterator(); - for (final Task task : thread.tasks().values()) { - assertSame(it.next(), ((RecordCollectorImpl) ((StreamTask) task).recordCollector()).producer()); - } assertSame(clientSupplier.consumer, thread.consumer); assertSame(clientSupplier.restoreConsumer, thread.restoreConsumer); } @Test public void shouldCloseAllTaskProducersOnCloseIfEosEnabled() throws InterruptedException { - internalTopologyBuilder.addSource(null, "source1", null, null, null, "someTopic"); + internalTopologyBuilder.addSource(null, "source1", null, null, null, topic1); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); - final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); - assignment.put(new TaskId(0, 0), Collections.singleton(new TopicPartition("someTopic", 0))); - assignment.put(new TaskId(0, 1), Collections.singleton(new TopicPartition("someTopic", 1))); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(assignment)); - thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - thread.rebalanceListener.onPartitionsAssigned(Collections.singleton(new TopicPartition("someTopic", 0))); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + // assign single partition + assignedPartitions.add(t1p1); + assignedPartitions.add(t1p2); + activeTasks.put(task1, Collections.singleton(t1p1)); + activeTasks.put(task2, Collections.singleton(t1p2)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + thread.taskManager().createTasks(assignedPartitions); + + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); thread.shutdown(); thread.run(); @@ -706,26 +431,24 @@ public class StreamThreadTest { public void shouldShutdownTaskManagerOnClose() throws InterruptedException { final Consumer<byte[], byte[]> consumer = EasyMock.createNiceMock(Consumer.class); final TaskManager taskManager = EasyMock.createNiceMock(TaskManager.class); - taskManager.setConsumer(EasyMock.anyObject(Consumer.class)); - EasyMock.expectLastCall(); + EasyMock.expect(taskManager.activeTasks()).andReturn(Collections.<TaskId, StreamTask>emptyMap()); + EasyMock.expect(taskManager.standbyTasks()).andReturn(Collections.<TaskId, StandbyTask>emptyMap()); taskManager.shutdown(true); EasyMock.expectLastCall(); EasyMock.replay(taskManager, consumer); StreamThread.StreamsMetricsThreadImpl streamsMetrics = new StreamThread.StreamsMetricsThreadImpl(metrics, "", "", Collections.<String, String>emptyMap()); - final StreamThread thread = new StreamThread(internalTopologyBuilder, - clientId, - "", - config, - processId, - mockTime, - streamsMetadataState, - taskManager, - streamsMetrics, - clientSupplier, - consumer, - clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), - stateDirectory); + final StreamThread thread = new StreamThread(mockTime, + config, + consumer, + consumer, + null, + clientSupplier.getAdminClient(config.getAdminConfigs(clientId)), + taskManager, + streamsMetrics, + internalTopologyBuilder, + clientId, + new LogContext("")); thread.setState(StreamThread.State.RUNNING); thread.shutdown(); thread.run(); @@ -739,112 +462,55 @@ public class StreamThreadTest { final StreamThread thread = createStreamThread(clientId, config, false); - thread.setThreadMetadataProvider(new StreamPartitionAssignor() { - @Override - public Map<TaskId, Set<TopicPartition>> standbyTasks() { - return Collections.singletonMap(new TaskId(0, 0), Utils.mkSet(new TopicPartition("topic", 0))); - } - }); - thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); - } - - @Test - public void shouldCloseSuspendedTasksThatAreNoLongerAssignedToThisStreamThreadBeforeCreatingNewTasks() { - internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one"); - internalStreamsBuilder.stream(Collections.singleton("t2"), consumed).groupByKey().count("count-two"); - - final StreamThread thread = createStreamThread(clientId, config, false); - final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; - restoreConsumer.updatePartitions("stream-thread-test-count-one-changelog", - Collections.singletonList(new PartitionInfo("stream-thread-test-count-one-changelog", - 0, - null, - new Node[0], - new Node[0]))); - restoreConsumer.updatePartitions("stream-thread-test-count-two-changelog", - Collections.singletonList(new PartitionInfo("stream-thread-test-count-two-changelog", - 0, - null, - new Node[0], - new Node[0]))); - - - final HashMap<TopicPartition, Long> offsets = new HashMap<>(); - offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L); - offsets.put(new TopicPartition("stream-thread-test-count-two-changelog", 0), 0L); - restoreConsumer.updateEndOffsets(offsets); - restoreConsumer.updateBeginningOffsets(offsets); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - final TopicPartition t1 = new TopicPartition("t1", 0); - Set<TopicPartition> partitionsT1 = Utils.mkSet(t1); - standbyTasks.put(new TaskId(0, 0), partitionsT1); - - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - final TopicPartition t2 = new TopicPartition("t2", 0); - Set<TopicPartition> partitionsT2 = Utils.mkSet(t2); - activeTasks.put(new TaskId(1, 0), partitionsT2); - clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t2, 0L)); - thread.setThreadMetadataProvider(new StreamPartitionAssignor() { - @Override - public Map<TaskId, Set<TopicPartition>> standbyTasks() { - return standbyTasks; - } - - @Override - public Map<TaskId, Set<TopicPartition>> activeTasks() { - return activeTasks; - } - }); + // assign single partition + standbyTasks.put(task1, Collections.singleton(t1p1)); - thread.setState(StreamThread.State.RUNNING); - thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - clientSupplier.consumer.assign(partitionsT2); - thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t2)); - thread.runOnce(-1); - // swap the assignment around and make sure we don't get any exceptions - standbyTasks.clear(); - activeTasks.clear(); - standbyTasks.put(new TaskId(1, 0), Utils.mkSet(t2)); - activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1)); + thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks); + thread.taskManager().createTasks(Collections.<TopicPartition>emptyList()); - thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - clientSupplier.consumer.assign(partitionsT1); - thread.rebalanceListener.onPartitionsAssigned(Utils.mkSet(t1)); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); } @Test public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerWasFencedWhileProcessing() throws InterruptedException { - internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC); + internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); internalTopologyBuilder.addSink("sink", "dummyTopic", null, null, null, "source"); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); final MockConsumer<byte[], byte[]> consumer = clientSupplier.consumer; - consumer.updatePartitions(TOPIC, Collections.singletonList(new PartitionInfo(TOPIC, 0, null, null, null))); - - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - activeTasks.put(task1, task0Assignment); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(activeTasks)); + consumer.updatePartitions(topic1, Collections.singletonList(new PartitionInfo(topic1, 1, null, null, null))); thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(null); - thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.runOnce(-1); assertThat(thread.tasks().size(), equalTo(1)); final MockProducer producer = clientSupplier.producers.get(0); // change consumer subscription from "pattern" to "manual" to be able to call .addRecords() - consumer.updateBeginningOffsets(Collections.singletonMap(task0Assignment.iterator().next(), 0L)); + consumer.updateBeginningOffsets(Collections.singletonMap(assignedPartitions.iterator().next(), 0L)); consumer.unsubscribe(); - consumer.assign(task0Assignment); + consumer.assign(new HashSet<>(assignedPartitions)); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1); thread.runOnce(-1); assertThat(producer.history().size(), equalTo(1)); @@ -862,7 +528,7 @@ public class StreamThreadTest { producer.fenceProducer(); mockTime.sleep(config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG) + 1L); - consumer.addRecord(new ConsumerRecord<>(TOPIC, 0, 0, new byte[0], new byte[0])); + consumer.addRecord(new ConsumerRecord<>(topic1, 1, 0, new byte[0], new byte[0])); try { thread.runOnce(-1); fail("Should have thrown TaskMigratedException"); @@ -881,26 +547,31 @@ public class StreamThreadTest { @Test public void shouldCloseTaskAsZombieAndRemoveFromActiveTasksIfProducerGotFencedAtBeginTransactionWhenTaskIsResumed() { - internalTopologyBuilder.addSource(null, "name", null, null, null, "topic"); + internalTopologyBuilder.addSource(null, "name", null, null, null, topic1); internalTopologyBuilder.addSink("out", "output", null, null, null); final StreamThread thread = createStreamThread(clientId, new StreamsConfig(configProps(true)), true); + thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsRevoked(null); + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - activeTasks.put(task1, task0Assignment); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(activeTasks)); + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); - thread.setState(StreamThread.State.RUNNING); - thread.rebalanceListener.onPartitionsRevoked(null); - thread.rebalanceListener.onPartitionsAssigned(task0Assignment); thread.runOnce(-1); assertThat(thread.tasks().size(), equalTo(1)); thread.rebalanceListener.onPartitionsRevoked(null); clientSupplier.producers.get(0).fenceProducer(); - thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); try { thread.runOnce(-1); fail("Should have thrown TaskMigratedException"); @@ -909,80 +580,6 @@ public class StreamThreadTest { assertTrue(thread.tasks().isEmpty()); } - @Test - @SuppressWarnings("unchecked") - public void shouldAlwaysUpdateWithLatestTopicsFromStreamPartitionAssignor() throws Exception { - internalTopologyBuilder.addSource(null, "source", null, null, null, Pattern.compile("t.*")); - internalTopologyBuilder.addProcessor("processor", new MockProcessorSupplier(), "source"); - - final StreamThread thread = createStreamThread(clientId, config, false); - - final StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); - final Map<String, Object> configurationMap = new HashMap<>(); - - configurationMap.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE, thread); - configurationMap.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, 0); - partitionAssignor.configure(configurationMap); - - thread.setThreadMetadataProvider(partitionAssignor); - - final Field nodeToSourceTopicsField = - internalTopologyBuilder.getClass().getDeclaredField("nodeToSourceTopics"); - nodeToSourceTopicsField.setAccessible(true); - final Map<String, List<String>> - nodeToSourceTopics = - (Map<String, List<String>>) nodeToSourceTopicsField.get(internalTopologyBuilder); - final List<TopicPartition> topicPartitions = new ArrayList<>(); - - final TopicPartition topicPartition1 = new TopicPartition("topic-1", 0); - final TopicPartition topicPartition2 = new TopicPartition("topic-2", 0); - final TopicPartition topicPartition3 = new TopicPartition("topic-3", 0); - - final TaskId taskId1 = new TaskId(0, 0); - final TaskId taskId2 = new TaskId(0, 0); - final TaskId taskId3 = new TaskId(0, 0); - - List<TaskId> activeTasks = Utils.mkList(taskId1); - - final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - - AssignmentInfo info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>()); - - topicPartitions.addAll(Utils.mkList(topicPartition1)); - PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); - partitionAssignor.onAssignment(assignment); - - assertTrue(nodeToSourceTopics.get("source").size() == 1); - assertTrue(nodeToSourceTopics.get("source").contains("topic-1")); - - topicPartitions.clear(); - - activeTasks = Arrays.asList(taskId1, taskId2); - info = new AssignmentInfo(activeTasks, standbyTasks, new HashMap<HostInfo, Set<TopicPartition>>()); - topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2)); - assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); - partitionAssignor.onAssignment(assignment); - - assertTrue(nodeToSourceTopics.get("source").size() == 2); - assertTrue(nodeToSourceTopics.get("source").contains("topic-1")); - assertTrue(nodeToSourceTopics.get("source").contains("topic-2")); - - topicPartitions.clear(); - - activeTasks = Arrays.asList(taskId1, taskId2, taskId3); - info = new AssignmentInfo(activeTasks, standbyTasks, - new HashMap<HostInfo, Set<TopicPartition>>()); - topicPartitions.addAll(Arrays.asList(topicPartition1, topicPartition2, topicPartition3)); - assignment = new PartitionAssignor.Assignment(topicPartitions, info.encode()); - partitionAssignor.onAssignment(assignment); - - assertTrue(nodeToSourceTopics.get("source").size() == 3); - assertTrue(nodeToSourceTopics.get("source").contains("topic-1")); - assertTrue(nodeToSourceTopics.get("source").contains("topic-2")); - assertTrue(nodeToSourceTopics.get("source").contains("topic-3")); - - } - private static class StateListenerStub implements StreamThread.StateListener { int numChanges = 0; ThreadStateTransitionValidator oldState = null; @@ -1006,33 +603,39 @@ public class StreamThreadTest { return createStreamThread(clientId, config, false); } - @Test public void shouldReturnActiveTaskMetadataWhileRunningState() throws InterruptedException { - internalTopologyBuilder.addSource(null, "source", null, null, null, TOPIC); + internalTopologyBuilder.addSource(null, "source", null, null, null, topic1); - final TaskId taskId = new TaskId(0, 0); final StreamThread thread = createStreamThread(clientId, config, false); - final Map<TaskId, Set<TopicPartition>> assignment = new HashMap<>(); - assignment.put(taskId, task0Assignment); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(assignment)); - thread.setState(StreamThread.State.RUNNING); thread.rebalanceListener.onPartitionsRevoked(null); - thread.rebalanceListener.onPartitionsAssigned(task0Assignment); + + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + + thread.taskManager().setAssignmentMetadata(activeTasks, Collections.<TaskId, Set<TopicPartition>>emptyMap()); + thread.taskManager().createTasks(assignedPartitions); + + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + thread.runOnce(-1); ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); - assertTrue(threadMetadata.activeTasks().contains(new TaskMetadata(taskId.toString(), task0Assignment))); + assertTrue(threadMetadata.activeTasks().contains(new TaskMetadata(task1.toString(), Utils.mkSet(t1p1)))); assertTrue(threadMetadata.standbyTasks().isEmpty()); } @Test public void shouldReturnStandbyTaskMetadataWhileRunningState() throws InterruptedException { - internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one"); + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).groupByKey().count("count-one"); final StreamThread thread = createStreamThread(clientId, config, false); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; @@ -1044,30 +647,28 @@ public class StreamThreadTest { new Node[0]))); final HashMap<TopicPartition, Long> offsets = new HashMap<>(); - offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 0), 0L); + offsets.put(new TopicPartition("stream-thread-test-count-one-changelog", 1), 0L); restoreConsumer.updateEndOffsets(offsets); restoreConsumer.updateBeginningOffsets(offsets); - final TaskId taskId = new TaskId(0, 0); + thread.setState(StreamThread.State.RUNNING); + + thread.rebalanceListener.onPartitionsRevoked(null); final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - final TopicPartition t1 = new TopicPartition("t1", 0); - Set<TopicPartition> partitionsT1 = Utils.mkSet(t1); - standbyTasks.put(taskId, partitionsT1); - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + // assign single partition + standbyTasks.put(task1, Collections.singleton(t1p1)); - thread.setThreadMetadataProvider(new MockStreamsPartitionAssignor(activeTasks, standbyTasks)); + thread.taskManager().setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), standbyTasks); - thread.setState(StreamThread.State.RUNNING); + thread.rebalanceListener.onPartitionsAssigned(Collections.<TopicPartition>emptyList()); - thread.rebalanceListener.onPartitionsRevoked(task0Assignment); - thread.rebalanceListener.onPartitionsAssigned(null); thread.runOnce(-1); ThreadMetadata threadMetadata = thread.threadMetadata(); assertEquals(StreamThread.State.RUNNING.name(), threadMetadata.threadState()); - assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(taskId.toString(), partitionsT1))); + assertTrue(threadMetadata.standbyTasks().contains(new TaskMetadata(task1.toString(), Utils.mkSet(t1p1)))); assertTrue(threadMetadata.activeTasks().isEmpty()); } @@ -1084,7 +685,7 @@ public class StreamThreadTest { @Test public void shouldAlwaysReturnEmptyTasksMetadataWhileRebalancingStateAndTasksNotRunning() throws InterruptedException { - internalStreamsBuilder.stream(Collections.singleton("t1"), consumed).groupByKey().count("count-one"); + internalStreamsBuilder.stream(Collections.singleton(topic1), consumed).groupByKey().count("count-one"); final StreamThread thread = createStreamThread(clientId, config, false); final MockConsumer<byte[], byte[]> restoreConsumer = clientSupplier.restoreConsumer; @@ -1107,48 +708,28 @@ public class StreamThreadTest { restoreConsumer.updateEndOffsets(offsets); restoreConsumer.updateBeginningOffsets(offsets); - final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - final TopicPartition t1p0 = new TopicPartition("t1", 0); - Set<TopicPartition> partitionsT1P0 = Utils.mkSet(t1p0); - standbyTasks.put(new TaskId(0, 0), partitionsT1P0); - - final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); - final TopicPartition t1p1 = new TopicPartition("t1", 1); - Set<TopicPartition> partitionsT1P1 = Utils.mkSet(t1p1); - activeTasks.put(new TaskId(0, 1), partitionsT1P1); clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 0L)); - thread.setThreadMetadataProvider(new StreamPartitionAssignor() { - @Override - public Map<TaskId, Set<TopicPartition>> standbyTasks() { - return standbyTasks; - } - - @Override - public Map<TaskId, Set<TopicPartition>> activeTasks() { - return activeTasks; - } - }); thread.setState(StreamThread.State.RUNNING); - thread.rebalanceListener.onPartitionsRevoked(partitionsT1P0); + final List<TopicPartition> assignedPartitions = new ArrayList<>(); + + thread.rebalanceListener.onPartitionsRevoked(assignedPartitions); assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED); - clientSupplier.consumer.assign(partitionsT1P1); - thread.rebalanceListener.onPartitionsAssigned(partitionsT1P1); - assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED); - thread.runOnce(-1); + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); - standbyTasks.clear(); - activeTasks.clear(); - standbyTasks.put(new TaskId(0, 1), Utils.mkSet(t1p1)); - activeTasks.put(new TaskId(0, 0), Utils.mkSet(t1p0)); + // assign single partition + assignedPartitions.add(t1p1); + activeTasks.put(task1, Collections.singleton(t1p1)); + standbyTasks.put(task2, Collections.singleton(t1p2)); - assertFalse(thread.threadMetadata().activeTasks().isEmpty()); - assertFalse(thread.threadMetadata().standbyTasks().isEmpty()); + thread.taskManager().setAssignmentMetadata(activeTasks, standbyTasks); - thread.rebalanceListener.onPartitionsRevoked(Collections.<TopicPartition>emptyList()); - assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_REVOKED); + thread.rebalanceListener.onPartitionsAssigned(assignedPartitions); + + assertThreadMetadataHasEmptyTasksWithState(thread.threadMetadata(), StreamThread.State.PARTITIONS_ASSIGNED); } private void assertThreadMetadataHasEmptyTasksWithState(ThreadMetadata metadata, StreamThread.State state) { http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java index a399dd4..660a622 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamsKafkaClientTest.java @@ -25,11 +25,8 @@ import org.apache.kafka.common.config.SaslConfigs; import org.apache.kafka.common.config.TopicConfig; import org.apache.kafka.common.metrics.KafkaMetric; import org.apache.kafka.common.metrics.MetricsReporter; -import org.apache.kafka.common.protocol.ApiKeys; -import org.apache.kafka.common.protocol.Errors; import org.apache.kafka.common.requests.AbstractRequest; import org.apache.kafka.common.requests.ApiError; -import org.apache.kafka.common.requests.ApiVersionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; import org.apache.kafka.common.requests.MetadataResponse; @@ -79,10 +76,9 @@ public class StreamsKafkaClientTest { public void testConfigFromStreamsConfig() { for (final String expectedMechanism : asList("PLAIN", "SCRAM-SHA-512")) { config.put(SaslConfigs.SASL_MECHANISM, expectedMechanism); - final StreamsConfig streamsConfig = new StreamsConfig(config); - final AbstractConfig config = StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig); - assertEquals(expectedMechanism, config.values().get(SaslConfigs.SASL_MECHANISM)); - assertEquals(expectedMechanism, config.getString(SaslConfigs.SASL_MECHANISM)); + final AbstractConfig abstractConfig = StreamsKafkaClient.Config.fromStreamsConfig(config); + assertEquals(expectedMechanism, abstractConfig.values().get(SaslConfigs.SASL_MECHANISM)); + assertEquals(expectedMechanism, abstractConfig.getString(SaslConfigs.SASL_MECHANISM)); } } @@ -138,7 +134,7 @@ public class StreamsKafkaClientTest { public void metricsShouldBeTaggedWithClientId() { config.put(StreamsConfig.CLIENT_ID_CONFIG, "some_client_id"); config.put(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, TestMetricsReporter.class.getName()); - StreamsKafkaClient.create(new StreamsConfig(config)); + StreamsKafkaClient.create(config); assertFalse(TestMetricsReporter.METRICS.isEmpty()); for (KafkaMetric kafkaMetric : TestMetricsReporter.METRICS.values()) { assertEquals("some_client_id", kafkaMetric.metricName().tags().get("client-id")); @@ -146,34 +142,6 @@ public class StreamsKafkaClientTest { } @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionOnEmptyBrokerCompatibilityResponse() { - kafkaClient.prepareResponse(null); - final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient(); - streamsKafkaClient.checkBrokerCompatibility(false); - } - - @Test(expected = StreamsException.class) - public void shouldThrowStreamsExceptionWhenBrokerCompatibilityResponseInconsistent() { - kafkaClient.prepareResponse(new ProduceResponse(Collections.<TopicPartition, ProduceResponse.PartitionResponse>emptyMap())); - final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient(); - streamsKafkaClient.checkBrokerCompatibility(false); - } - - @Test(expected = StreamsException.class) - public void shouldRequireBrokerVersion0101OrHigherWhenEosDisabled() { - kafkaClient.prepareResponse(new ApiVersionsResponse(Errors.NONE, Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.PRODUCE)))); - final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient(); - streamsKafkaClient.checkBrokerCompatibility(false); - } - - @Test(expected = StreamsException.class) - public void shouldRequireBrokerVersions0110OrHigherWhenEosEnabled() { - kafkaClient.prepareResponse(new ApiVersionsResponse(Errors.NONE, Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.CREATE_TOPICS)))); - final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient(); - streamsKafkaClient.checkBrokerCompatibility(true); - } - - @Test(expected = StreamsException.class) public void shouldThrowStreamsExceptionOnEmptyFetchMetadataResponse() { kafkaClient.prepareResponse(null); final StreamsKafkaClient streamsKafkaClient = createStreamsKafkaClient(); @@ -213,8 +181,7 @@ public class StreamsKafkaClientTest { } private StreamsKafkaClient createStreamsKafkaClient() { - final StreamsConfig streamsConfig = new StreamsConfig(config); - return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(streamsConfig), + return new StreamsKafkaClient(StreamsKafkaClient.Config.fromStreamsConfig(config), kafkaClient, reporters, new LogContext()); http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java index 1640f9e..55dcf79 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/TaskManagerTest.java @@ -22,19 +22,28 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.streams.errors.StreamsException; import org.apache.kafka.streams.processor.TaskId; + import org.easymock.EasyMock; import org.easymock.EasyMockRunner; import org.easymock.Mock; import org.easymock.MockType; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TemporaryFolder; import org.junit.runner.RunWith; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.regex.Pattern; import static org.easymock.EasyMock.checkOrder; import static org.easymock.EasyMock.expect; @@ -54,9 +63,17 @@ public class TaskManagerTest { private final Set<TopicPartition> taskId0Partitions = Utils.mkSet(t1p0); private final Map<TaskId, Set<TopicPartition>> taskId0Assignment = Collections.singletonMap(taskId0, taskId0Partitions); + @Mock(type = MockType.STRICT) + private InternalTopologyBuilder.SubscriptionUpdates subscriptionUpdates; + @Mock(type = MockType.STRICT) + private InternalTopologyBuilder topologyBuilder; + @Mock(type = MockType.STRICT) + private StateDirectory stateDirectory; @Mock(type = MockType.NICE) private ChangelogReader changeLogReader; @Mock(type = MockType.NICE) + private StreamsMetadataState streamsMetadataState; + @Mock(type = MockType.NICE) private Consumer<byte[], byte[]> restoreConsumer; @Mock(type = MockType.NICE) private Consumer<byte[], byte[]> consumer; @@ -65,7 +82,7 @@ public class TaskManagerTest { @Mock(type = MockType.NICE) private StreamThread.AbstractTaskCreator<StandbyTask> standbyTaskCreator; @Mock(type = MockType.NICE) - private ThreadMetadataProvider threadMetadataProvider; + private StreamsKafkaClient streamsKafkaClient; @Mock(type = MockType.NICE) private StreamTask streamTask; @Mock(type = MockType.NICE) @@ -77,17 +94,35 @@ public class TaskManagerTest { private TaskManager taskManager; + private final String topic1 = "topic1"; + private final String topic2 = "topic2"; + private final TopicPartition t1p1 = new TopicPartition(topic1, 1); + private final TopicPartition t1p2 = new TopicPartition(topic1, 2); + private final TopicPartition t1p3 = new TopicPartition(topic1, 3); + private final TopicPartition t2p1 = new TopicPartition(topic2, 1); + private final TopicPartition t2p2 = new TopicPartition(topic2, 2); + private final TopicPartition t2p3 = new TopicPartition(topic2, 3); + + private final TaskId task01 = new TaskId(0, 1); + private final TaskId task02 = new TaskId(0, 2); + private final TaskId task03 = new TaskId(0, 3); + private final TaskId task11 = new TaskId(1, 1); + + @Rule + public final TemporaryFolder testFolder = new TemporaryFolder(); @Before public void setUp() throws Exception { taskManager = new TaskManager(changeLogReader, + UUID.randomUUID(), "", restoreConsumer, + streamsMetadataState, activeTaskCreator, standbyTaskCreator, + streamsKafkaClient, active, standby); - taskManager.setThreadMetadataProvider(threadMetadataProvider); taskManager.setConsumer(consumer); } @@ -97,18 +132,110 @@ public class TaskManagerTest { consumer, activeTaskCreator, standbyTaskCreator, - threadMetadataProvider, active, standby); } @Test + public void shouldUpdateSubscriptionFromAssignment() { + mockTopologyBuilder(); + expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, topic2)), EasyMock.anyString()); + expectLastCall().once(); + + EasyMock.replay(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + + taskManager.updateSubscriptionsFromAssignment(Utils.mkList(t1p1, t2p1)); + + EasyMock.verify(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + } + + @Test + public void shouldNotUpdateSubscriptionFromAssignment() { + mockTopologyBuilder(); + expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1, topic2)); + + EasyMock.replay(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + + taskManager.updateSubscriptionsFromAssignment(Utils.mkList(t1p1)); + + EasyMock.verify(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + } + + @Test + public void shouldUpdateSubscriptionFromMetadata() { + mockTopologyBuilder(); + expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + topologyBuilder.updateSubscribedTopics(EasyMock.eq(Utils.mkSet(topic1, topic2)), EasyMock.anyString()); + expectLastCall().once(); + + EasyMock.replay(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + + taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1, topic2)); + + EasyMock.verify(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + } + + @Test + public void shouldNotUpdateSubscriptionFromMetadata() { + mockTopologyBuilder(); + expect(subscriptionUpdates.getUpdates()).andReturn(Utils.mkSet(topic1)); + + EasyMock.replay(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + + taskManager.updateSubscriptionsFromMetadata(Utils.mkSet(topic1)); + + EasyMock.verify(activeTaskCreator, + topologyBuilder, + subscriptionUpdates); + } + + @Test + public void shouldReturnCachedTaskIdsFromDirectory() throws IOException { + File[] taskFolders = Utils.mkList(testFolder.newFolder("0_1"), + testFolder.newFolder("0_2"), + testFolder.newFolder("0_3"), + testFolder.newFolder("1_1"), + testFolder.newFolder("dummy")).toArray(new File[0]); + + assertTrue((new File(taskFolders[0], ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile()); + assertTrue((new File(taskFolders[1], ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile()); + assertTrue((new File(taskFolders[3], ProcessorStateManager.CHECKPOINT_FILE_NAME)).createNewFile()); + + expect(activeTaskCreator.stateDirectory()).andReturn(stateDirectory).once(); + expect(stateDirectory.listTaskDirectories()).andReturn(taskFolders).once(); + + EasyMock.replay(activeTaskCreator, stateDirectory); + + Set<TaskId> tasks = taskManager.cachedTasksIds(); + + EasyMock.verify(activeTaskCreator, stateDirectory); + + assertThat(tasks, equalTo(Utils.mkSet(task01, task02, task11))); + } + + @Test public void shouldCloseActiveUnAssignedSuspendedTasksWhenCreatingNewTasks() { mockSingleActiveTask(); active.closeNonAssignedSuspendedTasks(taskId0Assignment); expectLastCall(); replay(); + taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.createTasks(taskId0Partitions); verify(active); @@ -121,6 +248,7 @@ public class TaskManagerTest { expectLastCall(); replay(); + taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.createTasks(taskId0Partitions); verify(active); @@ -133,6 +261,7 @@ public class TaskManagerTest { EasyMock.expectLastCall(); replay(); + taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.createTasks(taskId0Partitions); verify(changeLogReader); } @@ -144,6 +273,7 @@ public class TaskManagerTest { active.addNewTask(EasyMock.same(streamTask)); replay(); + taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.createTasks(taskId0Partitions); verify(activeTaskCreator, active); @@ -152,10 +282,10 @@ public class TaskManagerTest { @Test public void shouldNotAddResumedActiveTasks() { checkOrder(active, true); - mockThreadMetadataProvider(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment); EasyMock.expect(active.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true); replay(); + taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.createTasks(taskId0Partitions); // should be no calls to activeTaskCreator and no calls to active.addNewTasks(..) @@ -169,6 +299,7 @@ public class TaskManagerTest { standby.addNewTask(EasyMock.same(standbyTask)); replay(); + taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment); taskManager.createTasks(taskId0Partitions); verify(standbyTaskCreator, active); @@ -177,10 +308,10 @@ public class TaskManagerTest { @Test public void shouldNotAddResumedStandbyTasks() { checkOrder(active, true); - mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); EasyMock.expect(standby.maybeResumeSuspendedTask(taskId0, taskId0Partitions)).andReturn(true); replay(); + taskManager.setAssignmentMetadata(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment); taskManager.createTasks(taskId0Partitions); // should be no calls to standbyTaskCreator and no calls to standby.addNewTasks(..) @@ -196,6 +327,7 @@ public class TaskManagerTest { EasyMock.expectLastCall(); replay(); + taskManager.setAssignmentMetadata(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); taskManager.createTasks(taskId0Partitions); verify(consumer); } @@ -276,25 +408,6 @@ public class TaskManagerTest { } @Test - public void shouldCloseThreadMetadataProviderOnShutdown() { - threadMetadataProvider.close(); - EasyMock.expectLastCall(); - replay(); - - taskManager.shutdown(true); - verify(threadMetadataProvider); - } - - @Test - public void shouldNotPropagateExceptionsOnShutdown() { - threadMetadataProvider.close(); - EasyMock.expectLastCall().andThrow(new RuntimeException()); - replay(); - - taskManager.shutdown(false); - } - - @Test public void shouldInitializeNewActiveTasks() { EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); EasyMock.expect(active.updateRestored(EasyMock.<Collection<TopicPartition>>anyObject())). @@ -471,6 +584,26 @@ public class TaskManagerTest { EasyMock.verify(consumer); } + @Test + public void shouldUpdateTasksFromPartitionAssignment() { + final Map<TaskId, Set<TopicPartition>> activeTasks = new HashMap<>(); + final Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); + + taskManager.setAssignmentMetadata(activeTasks, standbyTasks); + assertTrue(taskManager.assignedActiveTasks().isEmpty()); + + // assign two active tasks with two partitions each + activeTasks.put(task01, new HashSet<>(Arrays.asList(t1p1, t2p1))); + activeTasks.put(task02, new HashSet<>(Arrays.asList(t1p2, t2p2))); + + // assign one standby task with two partitions + standbyTasks.put(task03, new HashSet<>(Arrays.asList(t1p3, t2p3))); + taskManager.setAssignmentMetadata(activeTasks, standbyTasks); + + assertThat(taskManager.assignedActiveTasks(), equalTo(activeTasks)); + assertThat(taskManager.assignedStandbyTasks(), equalTo(standbyTasks)); + } + private void mockAssignStandbyPartitions(final long offset) { final StandbyTask task = EasyMock.createNiceMock(StandbyTask.class); EasyMock.expect(active.initializeNewTasks()).andReturn(new HashSet<TopicPartition>()); @@ -486,31 +619,22 @@ public class TaskManagerTest { } private void mockStandbyTaskExpectations() { - mockThreadMetadataProvider(taskId0Assignment, Collections.<TaskId, Set<TopicPartition>>emptyMap()); expect(standbyTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(), EasyMock.eq(taskId0Assignment))) .andReturn(Collections.singletonList(standbyTask)); } - @SuppressWarnings("unchecked") private void mockSingleActiveTask() { - mockThreadMetadataProvider(Collections.<TaskId, Set<TopicPartition>>emptyMap(), taskId0Assignment); - - expect(activeTaskCreator.createTasks(EasyMock.anyObject(Consumer.class), + expect(activeTaskCreator.createTasks(EasyMock.<Consumer<byte[], byte[]>>anyObject(), EasyMock.eq(taskId0Assignment))) .andReturn(Collections.singletonList(streamTask)); } - private void mockThreadMetadataProvider(final Map<TaskId, Set<TopicPartition>> standbyAssignment, - final Map<TaskId, Set<TopicPartition>> activeAssignment) { - expect(threadMetadataProvider.standbyTasks()) - .andReturn(standbyAssignment) - .anyTimes(); - expect(threadMetadataProvider.activeTasks()) - .andReturn(activeAssignment) - .anyTimes(); + private void mockTopologyBuilder() { + expect(activeTaskCreator.builder()).andReturn(topologyBuilder).anyTimes(); + expect(topologyBuilder.sourceTopicPattern()).andReturn(Pattern.compile("abc")); + expect(topologyBuilder.subscriptionUpdates()).andReturn(subscriptionUpdates); } - } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/5df1eee7/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java index 3908305..598ca8d 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java +++ b/streams/src/test/java/org/apache/kafka/test/MockInternalTopicManager.java @@ -38,7 +38,7 @@ public class MockInternalTopicManager extends InternalTopicManager { private MockConsumer<byte[], byte[]> restoreConsumer; public MockInternalTopicManager(StreamsConfig streamsConfig, MockConsumer<byte[], byte[]> restoreConsumer) { - super(StreamsKafkaClient.create(streamsConfig), 0, 0, new MockTime()); + super(StreamsKafkaClient.create(streamsConfig.originals()), 0, 0, new MockTime()); this.restoreConsumer = restoreConsumer; }