Repository: kafka Updated Branches: refs/heads/trunk 04ef9c354 -> 9c4c5ae1c
MINOR: Add unit test for internal topics Author: Guozhang Wang <[email protected]> Reviewers: Yasuhiro Matsuda <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #1047 from guozhangwang/KInternal Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9c4c5ae1 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9c4c5ae1 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9c4c5ae1 Branch: refs/heads/trunk Commit: 9c4c5ae1cd15aa0afe3156e572362fbb40130573 Parents: 04ef9c3 Author: Guozhang Wang <[email protected]> Authored: Thu Mar 10 14:54:47 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Thu Mar 10 14:54:47 2016 -0800 ---------------------------------------------------------------------- .../internals/InternalTopicManager.java | 13 ++-- .../internals/StreamPartitionAssignor.java | 6 +- .../internals/StreamPartitionAssignorTest.java | 66 ++++++++++++++++++++ 3 files changed, 80 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java index ce95bb0..3725c4c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopicManager.java @@ -73,6 +73,11 @@ public class InternalTopicManager { } } + public InternalTopicManager() { + this.zkClient = null; + this.replicationFactor = 0; + } + public InternalTopicManager(String zkConnect, int replicationFactor) { this.zkClient = new ZkClient(zkConnect, 30 * 1000, 30 * 1000, new ZKStringSerializer()); this.replicationFactor = replicationFactor; @@ -125,7 +130,7 @@ public class InternalTopicManager { } @SuppressWarnings("unchecked") - public Map<Integer, List<Integer>> getTopicMetadata(String topic) { + private Map<Integer, List<Integer>> getTopicMetadata(String topic) { String data = zkClient.readData(ZK_TOPIC_PATH + "/" + topic, true); if (data == null) return null; @@ -147,7 +152,7 @@ public class InternalTopicManager { } } - public void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException { + private void createTopic(String topic, int numPartitions, int replicationFactor) throws ZkNodeExistsException { log.debug("Creating topic {} with {} partitions from ZK in partition assignor.", topic, numPartitions); List<Integer> brokers = getBrokers(); @@ -183,13 +188,13 @@ public class InternalTopicManager { } } - public void deleteTopic(String topic) throws ZkNodeExistsException { + private void deleteTopic(String topic) throws ZkNodeExistsException { log.debug("Deleting topic {} from ZK in partition assignor.", topic); zkClient.createPersistent(ZK_DELETE_TOPIC_PATH + "/" + topic, "", ZooDefs.Ids.OPEN_ACL_UNSAFE); } - public void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) { + private void addPartitions(String topic, int numPartitions, int replicationFactor, Map<Integer, List<Integer>> existingAssignment) { log.debug("Adding {} partitions topic {} from ZK with existing partitions assigned as {} in partition assignor.", topic, numPartitions, existingAssignment); List<Integer> brokers = getBrokers(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java index 1b3bf10..13f269b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java @@ -231,7 +231,7 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable log.debug("Starting to validate internal source topics in partition assignor."); for (Map.Entry<String, Set<TaskId>> entry : internalSourceTopicToTaskIds.entrySet()) { - String topic = streamThread.jobId + "-" + entry.getKey(); + String topic = entry.getKey(); // should have size 1 only int numPartitions = -1; @@ -455,4 +455,8 @@ public class StreamPartitionAssignor implements PartitionAssignor, Configurable public Map<TaskId, Set<TopicPartition>> standbyTasks() { return standbyTasks; } + + public void setInternalTopicManager(InternalTopicManager internalTopicManager) { + this.internalTopicManager = internalTopicManager; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9c4c5ae1/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java index 15b114a..9ff0af0 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -506,4 +506,70 @@ public class StreamPartitionAssignorTest { assertEquals(standbyTasks, partitionAssignor.standbyTasks()); } + @Test + public void testAssignWithInternalTopics() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addInternalTopic("topicX"); + builder.addSource("source1", "topic1"); + builder.addProcessor("processor1", new MockProcessorSupplier(), "source1"); + builder.addSink("sink1", "topicX", "processor1"); + builder.addSource("source2", "topicX"); + builder.addProcessor("processor2", new MockProcessorSupplier(), "source2"); + List<String> topics = Utils.mkList("topic1", "topicX"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + MockInternalTopicManager internalTopicManager = new MockInternalTopicManager(mockRestoreConsumer); + partitionAssignor.setInternalTopicManager(internalTopicManager); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + Set<TaskId> emptyTasks = Collections.<TaskId>emptySet(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, emptyTasks, emptyTasks).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + // check prepared internal topics + // TODO: we need to change it to 1 after fixing the prefix + assertEquals(2, internalTopicManager.readyTopics.size()); + assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("topicX")); + assertEquals(allTasks.size(), (long) internalTopicManager.readyTopics.get("test-topicX")); + } + + private class MockInternalTopicManager extends InternalTopicManager { + + public Map<String, Integer> readyTopics = new HashMap<>(); + public MockConsumer<byte[], byte[]> restoreConsumer; + + public MockInternalTopicManager(MockConsumer<byte[], byte[]> restoreConsumer) { + super(); + + this.restoreConsumer = restoreConsumer; + } + + @Override + public void makeReady(String topic, int numPartitions) { + readyTopics.put(topic, numPartitions); + + List<PartitionInfo> partitions = new ArrayList<>(); + for (int i = 0; i < numPartitions; i++) { + partitions.add(new PartitionInfo(topic, i, null, null, null)); + } + + restoreConsumer.updatePartitions(topic, partitions); + } + } }
