Repository: kafka Updated Branches: refs/heads/trunk 91ba074e4 -> 21c6cfe50
http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java new file mode 100644 index 0000000..15b114a --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignorTest.java @@ -0,0 +1,509 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.MockConsumer; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.clients.consumer.internals.PartitionAssignor; +import org.apache.kafka.clients.producer.MockProducer; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.metrics.Metrics; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.assignment.AssignmentInfo; +import org.apache.kafka.streams.processor.internals.assignment.SubscriptionInfo; +import org.apache.kafka.test.MockProcessorSupplier; +import org.apache.kafka.test.MockStateStoreSupplier; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Set; +import java.util.UUID; + +import static org.junit.Assert.assertEquals; + +public class StreamPartitionAssignorTest { + + private TopicPartition t1p0 = new TopicPartition("topic1", 0); + private TopicPartition t1p1 = new TopicPartition("topic1", 1); + private TopicPartition t1p2 = new TopicPartition("topic1", 2); + private TopicPartition t2p0 = new TopicPartition("topic2", 0); + private TopicPartition t2p1 = new TopicPartition("topic2", 1); + private TopicPartition t2p2 = new TopicPartition("topic2", 2); + private TopicPartition t3p0 = new TopicPartition("topic3", 0); + private TopicPartition t3p1 = new TopicPartition("topic3", 1); + private TopicPartition t3p2 = new TopicPartition("topic3", 2); + private TopicPartition t3p3 = new TopicPartition("topic3", 3); + + private Set<String> allTopics = Utils.mkSet("topic1", "topic2"); + + private List<PartitionInfo> infos = Arrays.asList( + new PartitionInfo("topic1", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic1", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic1", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic2", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo("topic3", 3, Node.noNode(), new Node[0], new Node[0]) + ); + + private Cluster metadata = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); + + private final TaskId task0 = new TaskId(0, 0); + private final TaskId task1 = new TaskId(0, 1); + private final TaskId task2 = new TaskId(0, 2); + private final TaskId task3 = new TaskId(0, 3); + + private Properties configProps() { + return new Properties() { + { + setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-partition-assignor-test"); + setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + } + }; + } + + private ByteArraySerializer serializer = new ByteArraySerializer(); + + @SuppressWarnings("unchecked") + @Test + public void testSubscription() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + + final Set<TaskId> prevTasks = Utils.mkSet( + new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1)); + final Set<TaskId> cachedTasks = Utils.mkSet( + new TaskId(0, 1), new TaskId(1, 1), new TaskId(2, 1), + new TaskId(0, 2), new TaskId(1, 2), new TaskId(2, 2)); + + String clientId = "client-id"; + UUID processId = UUID.randomUUID(); + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) { + @Override + public Set<TaskId> prevTasks() { + return prevTasks; + } + @Override + public Set<TaskId> cachedTasks() { + return cachedTasks; + } + }; + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread, "test", clientId)); + + PartitionAssignor.Subscription subscription = partitionAssignor.subscription(Utils.mkSet("topic1", "topic2")); + + Collections.sort(subscription.topics()); + assertEquals(Utils.mkList("topic1", "topic2"), subscription.topics()); + + Set<TaskId> standbyTasks = new HashSet<>(cachedTasks); + standbyTasks.removeAll(prevTasks); + + SubscriptionInfo info = new SubscriptionInfo(processId, prevTasks, standbyTasks); + assertEquals(info.encode(), subscription.userData()); + } + + @Test + public void testAssignBasic() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + List<String> topics = Utils.mkList("topic1", "topic2"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + + final Set<TaskId> prevTasks10 = Utils.mkSet(task0); + final Set<TaskId> prevTasks11 = Utils.mkSet(task1); + final Set<TaskId> prevTasks20 = Utils.mkSet(task2); + final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); + final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); + final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + // check assigned partitions + assertEquals(Utils.mkSet(Utils.mkSet(t1p0, t2p0), Utils.mkSet(t1p1, t2p1)), + Utils.mkSet(new HashSet<>(assignments.get("consumer10").partitions()), new HashSet<>(assignments.get("consumer11").partitions()))); + assertEquals(Utils.mkSet(t1p2, t2p2), new HashSet<>(assignments.get("consumer20").partitions())); + + // check assignment info + + Set<TaskId> allActiveTasks = new HashSet<>(); + + // the first consumer + AssignmentInfo info10 = checkAssignment(assignments.get("consumer10")); + allActiveTasks.addAll(info10.activeTasks); + + // the second consumer + AssignmentInfo info11 = checkAssignment(assignments.get("consumer11")); + allActiveTasks.addAll(info11.activeTasks); + + assertEquals(Utils.mkSet(task0, task1), allActiveTasks); + + // the third consumer + AssignmentInfo info20 = checkAssignment(assignments.get("consumer20")); + allActiveTasks.addAll(info20.activeTasks); + + assertEquals(3, allActiveTasks.size()); + assertEquals(allTasks, new HashSet<>(allActiveTasks)); + + assertEquals(3, allActiveTasks.size()); + assertEquals(allTasks, allActiveTasks); + } + + @Test + public void testAssignWithNewTasks() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addSource("source3", "topic3"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2", "source3"); + List<String> topics = Utils.mkList("topic1", "topic2", "topic3"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2, task3); + + // assuming that previous tasks do not have topic3 + final Set<TaskId> prevTasks10 = Utils.mkSet(task0); + final Set<TaskId> prevTasks11 = Utils.mkSet(task1); + final Set<TaskId> prevTasks20 = Utils.mkSet(task2); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, Collections.<TaskId>emptySet()).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, Collections.<TaskId>emptySet()).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, Collections.<TaskId>emptySet()).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + // check assigned partitions: since there is no previous task for topic 3 it will be assigned randomly so we cannot check exact match + // also note that previously assigned partitions / tasks may not stay on the previous host since we may assign the new task first and + // then later ones will be re-assigned to other hosts due to load balancing + Set<TaskId> allActiveTasks = new HashSet<>(); + Set<TopicPartition> allPartitions = new HashSet<>(); + AssignmentInfo info; + + info = AssignmentInfo.decode(assignments.get("consumer10").userData()); + allActiveTasks.addAll(info.activeTasks); + allPartitions.addAll(assignments.get("consumer10").partitions()); + + info = AssignmentInfo.decode(assignments.get("consumer11").userData()); + allActiveTasks.addAll(info.activeTasks); + allPartitions.addAll(assignments.get("consumer11").partitions()); + + info = AssignmentInfo.decode(assignments.get("consumer20").userData()); + allActiveTasks.addAll(info.activeTasks); + allPartitions.addAll(assignments.get("consumer20").partitions()); + + assertEquals(allTasks, allActiveTasks); + assertEquals(Utils.mkSet(t1p0, t1p1, t1p2, t2p0, t2p1, t2p2, t3p0, t3p1, t3p2, t3p3), allPartitions); + } + + @Test + public void testAssignWithStates() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + + builder.addProcessor("processor-1", new MockProcessorSupplier(), "source1"); + builder.addStateStore(new MockStateStoreSupplier("store1", false), "processor-1"); + + builder.addProcessor("processor-2", new MockProcessorSupplier(), "source2"); + builder.addStateStore(new MockStateStoreSupplier("store2", false), "processor-2"); + builder.addStateStore(new MockStateStoreSupplier("store3", false), "processor-2"); + + List<String> topics = Utils.mkList("topic1", "topic2"); + + TaskId task00 = new TaskId(0, 0); + TaskId task01 = new TaskId(0, 1); + TaskId task02 = new TaskId(0, 2); + TaskId task10 = new TaskId(1, 0); + TaskId task11 = new TaskId(1, 1); + TaskId task12 = new TaskId(1, 2); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, Collections.<TaskId>emptySet(), Collections.<TaskId>emptySet()).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + // check assigned partition size: since there is no previous task and there are two sub-topologies the assignment is random so we cannot check exact match + assertEquals(2, assignments.get("consumer10").partitions().size()); + assertEquals(2, assignments.get("consumer11").partitions().size()); + assertEquals(2, assignments.get("consumer20").partitions().size()); + + assertEquals(2, AssignmentInfo.decode(assignments.get("consumer10").userData()).activeTasks.size()); + assertEquals(2, AssignmentInfo.decode(assignments.get("consumer11").userData()).activeTasks.size()); + assertEquals(2, AssignmentInfo.decode(assignments.get("consumer20").userData()).activeTasks.size()); + + // check tasks for state topics + assertEquals(Utils.mkSet(task00, task01, task02), partitionAssignor.tasksForState("store1")); + assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store2")); + assertEquals(Utils.mkSet(task10, task11, task12), partitionAssignor.tasksForState("store3")); + } + + @Test + public void testAssignWithStandbyReplicas() throws Exception { + Properties props = configProps(); + props.setProperty(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG, "1"); + StreamsConfig config = new StreamsConfig(props); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + List<String> topics = Utils.mkList("topic1", "topic2"); + Set<TaskId> allTasks = Utils.mkSet(task0, task1, task2); + + + final Set<TaskId> prevTasks10 = Utils.mkSet(task0); + final Set<TaskId> prevTasks11 = Utils.mkSet(task1); + final Set<TaskId> prevTasks20 = Utils.mkSet(task2); + final Set<TaskId> standbyTasks10 = Utils.mkSet(task1); + final Set<TaskId> standbyTasks11 = Utils.mkSet(task2); + final Set<TaskId> standbyTasks20 = Utils.mkSet(task0); + + UUID uuid1 = UUID.randomUUID(); + UUID uuid2 = UUID.randomUUID(); + String client1 = "client1"; + String client2 = "client2"; + + StreamThread thread10 = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid1, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread10, "test", client1)); + + Map<String, PartitionAssignor.Subscription> subscriptions = new HashMap<>(); + subscriptions.put("consumer10", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks10, standbyTasks10).encode())); + subscriptions.put("consumer11", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid1, prevTasks11, standbyTasks11).encode())); + subscriptions.put("consumer20", + new PartitionAssignor.Subscription(topics, new SubscriptionInfo(uuid2, prevTasks20, standbyTasks20).encode())); + + Map<String, PartitionAssignor.Assignment> assignments = partitionAssignor.assign(metadata, subscriptions); + + Set<TaskId> allActiveTasks = new HashSet<>(); + Set<TaskId> allStandbyTasks = new HashSet<>(); + + // the first consumer + AssignmentInfo info10 = checkAssignment(assignments.get("consumer10")); + allActiveTasks.addAll(info10.activeTasks); + allStandbyTasks.addAll(info10.standbyTasks.keySet()); + + // the second consumer + AssignmentInfo info11 = checkAssignment(assignments.get("consumer11")); + allActiveTasks.addAll(info11.activeTasks); + allStandbyTasks.addAll(info11.standbyTasks.keySet()); + + // check active tasks assigned to the first client + assertEquals(Utils.mkSet(task0, task1), new HashSet<>(allActiveTasks)); + assertEquals(Utils.mkSet(task2), new HashSet<>(allStandbyTasks)); + + // the third consumer + AssignmentInfo info20 = checkAssignment(assignments.get("consumer20")); + allActiveTasks.addAll(info20.activeTasks); + allStandbyTasks.addAll(info20.standbyTasks.keySet()); + + // all task ids are in the active tasks and also in the standby tasks + + assertEquals(3, allActiveTasks.size()); + assertEquals(allTasks, allActiveTasks); + + assertEquals(3, allStandbyTasks.size()); + assertEquals(allTasks, allStandbyTasks); + } + + private AssignmentInfo checkAssignment(PartitionAssignor.Assignment assignment) { + + // This assumed 1) DefaultPartitionGrouper is used, and 2) there is a only one topic group. + + AssignmentInfo info = AssignmentInfo.decode(assignment.userData()); + + // check if the number of assigned partitions == the size of active task id list + assertEquals(assignment.partitions().size(), info.activeTasks.size()); + + // check if active tasks are consistent + List<TaskId> activeTasks = new ArrayList<>(); + Set<String> activeTopics = new HashSet<>(); + for (TopicPartition partition : assignment.partitions()) { + // since default grouper, taskid.partition == partition.partition() + activeTasks.add(new TaskId(0, partition.partition())); + activeTopics.add(partition.topic()); + } + assertEquals(activeTasks, info.activeTasks); + + // check if active partitions cover all topics + assertEquals(allTopics, activeTopics); + + // check if standby tasks are consistent + Set<String> standbyTopics = new HashSet<>(); + for (Map.Entry<TaskId, Set<TopicPartition>> entry : info.standbyTasks.entrySet()) { + TaskId id = entry.getKey(); + Set<TopicPartition> partitions = entry.getValue(); + for (TopicPartition partition : partitions) { + // since default grouper, taskid.partition == partition.partition() + assertEquals(id.partition, partition.partition()); + + standbyTopics.add(partition.topic()); + } + } + + if (info.standbyTasks.size() > 0) + // check if standby partitions cover all topics + assertEquals(allTopics, standbyTopics); + + return info; + } + + @Test + public void testOnAssignment() throws Exception { + StreamsConfig config = new StreamsConfig(configProps()); + + MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); + MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); + MockConsumer<byte[], byte[]> mockRestoreConsumer = new MockConsumer<>(OffsetResetStrategy.LATEST); + + TopicPartition t2p3 = new TopicPartition("topic2", 3); + + TopologyBuilder builder = new TopologyBuilder(); + builder.addSource("source1", "topic1"); + builder.addSource("source2", "topic2"); + builder.addProcessor("processor", new MockProcessorSupplier(), "source1", "source2"); + + UUID uuid = UUID.randomUUID(); + String client1 = "client1"; + + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", client1, uuid, new Metrics(), new SystemTime()); + + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); + partitionAssignor.configure(config.getConsumerConfigs(thread, "test", client1)); + + List<TaskId> activeTaskList = Utils.mkList(task0, task3); + Map<TaskId, Set<TopicPartition>> standbyTasks = new HashMap<>(); + standbyTasks.put(task1, Utils.mkSet(new TopicPartition("t1", 0))); + standbyTasks.put(task2, Utils.mkSet(new TopicPartition("t2", 0))); + + AssignmentInfo info = new AssignmentInfo(activeTaskList, standbyTasks); + PartitionAssignor.Assignment assignment = new PartitionAssignor.Assignment(Utils.mkList(t1p0, t2p3), info.encode()); + partitionAssignor.onAssignment(assignment); + + assertEquals(Utils.mkSet(task0), partitionAssignor.tasksForPartition(t1p0)); + assertEquals(Utils.mkSet(task3), partitionAssignor.tasksForPartition(t2p3)); + assertEquals(standbyTasks, partitionAssignor.standbyTasks()); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java index 1847e85..bf3b3b1 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.test.MockSourceNode; @@ -69,17 +69,18 @@ public class StreamTaskTest { Collections.<StateStoreSupplier>emptyList() ); - private StreamingConfig createConfig(final File baseDir) throws Exception { - return new StreamingConfig(new Properties() { + private StreamsConfig createConfig(final File baseDir) throws Exception { + return new StreamsConfig(new Properties() { { - setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); - setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-task-test"); + setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); } }); } @@ -102,7 +103,7 @@ public class StreamTaskTest { public void testProcessOrder() throws Exception { File baseDir = Files.createTempDirectory("test").toFile(); try { - StreamingConfig config = createConfig(baseDir); + StreamsConfig config = createConfig(baseDir); StreamTask task = new StreamTask(new TaskId(0, 0), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( @@ -153,7 +154,7 @@ public class StreamTaskTest { public void testPauseResume() throws Exception { File baseDir = Files.createTempDirectory("test").toFile(); try { - StreamingConfig config = createConfig(baseDir); + StreamsConfig config = createConfig(baseDir); StreamTask task = new StreamTask(new TaskId(1, 1), "jobId", partitions, topology, consumer, producer, restoreStateConsumer, config, null); task.addRecords(partition1, records( http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 5f0347d..2d531bc 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -37,7 +37,7 @@ import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.utils.MockTime; import org.apache.kafka.common.utils.SystemTime; import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.StreamsConfig; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.test.MockProcessorSupplier; @@ -112,13 +112,14 @@ public class StreamThreadTest { private Properties configProps() { return new Properties() { { - setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); - setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); - setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); - setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamingConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); + setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); + setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test"); + setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); } }; } @@ -132,7 +133,7 @@ public class StreamThreadTest { Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, - StreamingConfig config) { + StreamsConfig config) { super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null); } @@ -148,7 +149,7 @@ public class StreamThreadTest { @SuppressWarnings("unchecked") @Test public void testPartitionAssignmentChange() throws Exception { - StreamingConfig config = new StreamingConfig(configProps()); + StreamsConfig config = new StreamsConfig(configProps()); MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -258,10 +259,10 @@ public class StreamThreadTest { try { final long cleanupDelay = 1000L; Properties props = configProps(); - props.setProperty(StreamingConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay)); - props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + props.setProperty(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG, Long.toString(cleanupDelay)); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); - StreamingConfig config = new StreamingConfig(props); + StreamsConfig config = new StreamsConfig(props); File stateDir1 = new File(baseDir, task1.toString()); File stateDir2 = new File(baseDir, task2.toString()); @@ -389,10 +390,10 @@ public class StreamThreadTest { try { final long commitInterval = 1000L; Properties props = configProps(); - props.setProperty(StreamingConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); - props.setProperty(StreamingConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); + props.setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); + props.setProperty(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, Long.toString(commitInterval)); - StreamingConfig config = new StreamingConfig(props); + StreamsConfig config = new StreamsConfig(props); MockProducer<byte[], byte[]> producer = new MockProducer<>(true, serializer, serializer); MockConsumer<byte[], byte[]> consumer = new MockConsumer<>(OffsetResetStrategy.EARLIEST); @@ -468,9 +469,9 @@ public class StreamThreadTest { } } - private void initPartitionGrouper(StreamingConfig config, StreamThread thread) { + private void initPartitionGrouper(StreamsConfig config, StreamThread thread) { - KafkaStreamingPartitionAssignor partitionAssignor = new KafkaStreamingPartitionAssignor(); + StreamPartitionAssignor partitionAssignor = new StreamPartitionAssignor(); partitionAssignor.configure(config.getConsumerConfigs(thread, thread.jobId, thread.clientId)); http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index b0c9bd7..36e487b 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -23,12 +23,13 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.StreamsPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.test.MockProcessorContext; @@ -221,10 +222,10 @@ public class KeyValueStoreTestDriver<K, V> { private final Serdes<K, V> serdes; private final Map<K, V> flushedEntries = new HashMap<>(); private final Set<K> flushedRemovals = new HashSet<>(); - private final List<Entry<K, V>> restorableEntries = new LinkedList<>(); + private final List<KeyValue<K, V>> restorableEntries = new LinkedList<>(); private final MockProcessorContext context; private final Map<String, StateStore> storeMap = new HashMap<>(); - private final StreamingMetrics metrics = new StreamingMetrics() { + private final StreamsMetrics metrics = new StreamsMetrics() { @Override public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { return null; @@ -248,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> { } @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, - StreamPartitioner<K1, V1> partitioner) { + StreamsPartitioner<K1, V1> partitioner) { recordFlushed(record.key(), record.value()); } }; @@ -256,12 +257,12 @@ public class KeyValueStoreTestDriver<K, V> { this.stateDir.mkdirs(); Properties props = new Properties(); - props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); - props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass()); - props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass()); - props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass()); - props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass()); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, MockTimestampExtractor.class); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, serdes.keySerializer().getClass()); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, serdes.keyDeserializer().getClass()); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, serdes.valueSerializer().getClass()); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, serdes.valueDeserializer().getClass()); this.context = new MockProcessorContext(null, this.stateDir, serdes.keySerializer(), serdes.keyDeserializer(), serdes.valueSerializer(), serdes.valueDeserializer(), recordCollector) { @@ -287,7 +288,7 @@ public class KeyValueStoreTestDriver<K, V> { } @Override - public StreamingMetrics metrics() { + public StreamsMetrics metrics() { return metrics; } @@ -313,10 +314,10 @@ public class KeyValueStoreTestDriver<K, V> { } private void restoreEntries(StateRestoreCallback func) { - for (Entry<K, V> entry : restorableEntries) { + for (KeyValue<K, V> entry : restorableEntries) { if (entry != null) { - byte[] rawKey = serdes.rawKey(entry.key()); - byte[] rawValue = serdes.rawValue(entry.value()); + byte[] rawKey = serdes.rawKey(entry.key); + byte[] rawValue = serdes.rawValue(entry.value); func.restore(rawKey, rawValue); } } @@ -352,7 +353,7 @@ public class KeyValueStoreTestDriver<K, V> { * @see #checkForRestoredEntries(KeyValueStore) */ public void addEntryToRestoreLog(K key, V value) { - restorableEntries.add(new Entry<K, V>(key, value)); + restorableEntries.add(new KeyValue<K, V>(key, value)); } /** @@ -376,7 +377,7 @@ public class KeyValueStoreTestDriver<K, V> { * * @return the restore entries; never null but possibly a null iterator */ - public Iterable<Entry<K, V>> restoredEntries() { + public Iterable<KeyValue<K, V>> restoredEntries() { return restorableEntries; } @@ -390,10 +391,10 @@ public class KeyValueStoreTestDriver<K, V> { */ public int checkForRestoredEntries(KeyValueStore<K, V> store) { int missing = 0; - for (Entry<K, V> entry : restorableEntries) { - if (entry != null) { - V value = store.get(entry.key()); - if (!Objects.equals(value, entry.value())) { + for (KeyValue<K, V> kv : restorableEntries) { + if (kv != null) { + V value = store.get(kv.key); + if (!Objects.equals(value, kv.value)) { ++missing; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java index 2ed698c..8effd77 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/AbstractKeyValueStoreTest.java @@ -21,8 +21,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; import static org.junit.Assert.fail; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.KeyValueIterator; import org.apache.kafka.streams.state.KeyValueStore; import org.apache.kafka.streams.state.KeyValueStoreTestDriver; @@ -73,11 +73,11 @@ public abstract class AbstractKeyValueStoreTest { // Check range iteration ... try (KeyValueIterator<Integer, String> iter = store.range(2, 4)) { while (iter.hasNext()) { - Entry<Integer, String> entry = iter.next(); - if (entry.key().equals(2)) - assertEquals("two", entry.value()); - else if (entry.key().equals(4)) - assertEquals("four", entry.value()); + KeyValue<Integer, String> entry = iter.next(); + if (entry.key.equals(2)) + assertEquals("two", entry.value); + else if (entry.key.equals(4)) + assertEquals("four", entry.value); else fail("Unexpected entry: " + entry); } @@ -86,11 +86,11 @@ public abstract class AbstractKeyValueStoreTest { // Check range iteration ... try (KeyValueIterator<Integer, String> iter = store.range(2, 6)) { while (iter.hasNext()) { - Entry<Integer, String> entry = iter.next(); - if (entry.key().equals(2)) - assertEquals("two", entry.value()); - else if (entry.key().equals(4)) - assertEquals("four", entry.value()); + KeyValue<Integer, String> entry = iter.next(); + if (entry.key.equals(2)) + assertEquals("two", entry.value); + else if (entry.key.equals(4)) + assertEquals("four", entry.value); else fail("Unexpected entry: " + entry); } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java index 80ad67f..45448e5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/internals/RocksDBWindowStoreTest.java @@ -24,10 +24,10 @@ import org.apache.kafka.common.serialization.ByteArrayDeserializer; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStoreSupplier; import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.Entry; import org.apache.kafka.streams.state.Serdes; import org.apache.kafka.streams.state.WindowStore; import org.apache.kafka.streams.state.WindowStoreIterator; @@ -69,12 +69,12 @@ public class RocksDBWindowStoreTest { public void testPutAndFetch() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -165,12 +165,12 @@ public class RocksDBWindowStoreTest { public void testPutAndFetchBefore() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -261,12 +261,12 @@ public class RocksDBWindowStoreTest { public void testPutAndFetchAfter() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -357,12 +357,12 @@ public class RocksDBWindowStoreTest { public void testPutSameKeyTimestamp() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -416,12 +416,12 @@ public class RocksDBWindowStoreTest { public void testRolling() throws IOException { File baseDir = Files.createTempDirectory("test").toFile(); try { - final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); Producer<byte[], byte[]> producer = new MockProducer<>(true, byteArraySerializer, byteArraySerializer); RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -528,7 +528,7 @@ public class RocksDBWindowStoreTest { @Test public void testRestore() throws IOException { - final List<Entry<byte[], byte[]>> changeLog = new ArrayList<>(); + final List<KeyValue<byte[], byte[]>> changeLog = new ArrayList<>(); long startTime = segmentSize * 2; long incr = segmentSize / 2; @@ -538,7 +538,7 @@ public class RocksDBWindowStoreTest { RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -587,7 +587,7 @@ public class RocksDBWindowStoreTest { RecordCollector recordCollector = new RecordCollector(producer) { @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer) { - changeLog.add(new Entry<>( + changeLog.add(new KeyValue<>( keySerializer.serialize(record.topic(), record.key()), valueSerializer.serialize(record.topic(), record.value())) ); @@ -655,13 +655,13 @@ public class RocksDBWindowStoreTest { return set; } - private Map<Integer, Set<String>> entriesByKey(List<Entry<byte[], byte[]>> changeLog, long startTime) { + private Map<Integer, Set<String>> entriesByKey(List<KeyValue<byte[], byte[]>> changeLog, long startTime) { HashMap<Integer, Set<String>> entriesByKey = new HashMap<>(); - for (Entry<byte[], byte[]> entry : changeLog) { - long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key()); - Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key(), serdes); - String value = entry.value() == null ? null : serdes.valueFrom(entry.value()); + for (KeyValue<byte[], byte[]> entry : changeLog) { + long timestamp = WindowStoreUtil.timestampFromBinaryKey(entry.key); + Integer key = WindowStoreUtil.keyFromBinaryKey(entry.key, serdes); + String value = entry.value == null ? null : serdes.valueFrom(entry.value); Set<String> entries = entriesByKey.get(key); if (entries == null) { http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 2dc567e..8f8e00f 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.processor.StreamPartitioner; +import org.apache.kafka.streams.processor.StreamsPartitioner; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -130,7 +130,7 @@ public class KStreamTestDriver { @Override public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, - StreamPartitioner<K, V> partitioner) { + StreamsPartitioner<K, V> partitioner) { // The serialization is skipped. process(record.topic(), record.key(), record.value()); } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java index a6a29cd..cb7a95c 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java +++ b/streams/src/test/java/org/apache/kafka/test/MockProcessorContext.java @@ -18,7 +18,8 @@ package org.apache.kafka.test; import org.apache.kafka.common.metrics.Sensor; -import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; @@ -26,7 +27,6 @@ import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; -import org.apache.kafka.streams.state.Entry; import java.io.File; import java.util.Collections; @@ -123,8 +123,8 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S } @Override - public StreamingMetrics metrics() { - return new StreamingMetrics() { + public StreamsMetrics metrics() { + return new StreamsMetrics() { @Override public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { return null; @@ -192,10 +192,10 @@ public class MockProcessorContext implements ProcessorContext, RecordCollector.S return Collections.unmodifiableMap(storeMap); } - public void restore(String storeName, List<Entry<byte[], byte[]>> changeLog) { + public void restore(String storeName, List<KeyValue<byte[], byte[]>> changeLog) { StateRestoreCallback restoreCallback = restoreFuncs.get(storeName); - for (Entry<byte[], byte[]> entry : changeLog) { - restoreCallback.restore(entry.key(), entry.value()); + for (KeyValue<byte[], byte[]> entry : changeLog) { + restoreCallback.restore(entry.key, entry.value); } } } http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java index 98aa0d4..828b5ae 100644 --- a/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java +++ b/streams/src/test/java/org/apache/kafka/test/NoOpKeyValueMapper.java @@ -17,7 +17,7 @@ package org.apache.kafka.test; -import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.kstream.KeyValueMapper; public class NoOpKeyValueMapper<K, V> implements KeyValueMapper<K, V, KeyValue<K, V>> { http://git-wip-us.apache.org/repos/asf/kafka/blob/21c6cfe5/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java index eaeed09..af6d51b 100644 --- a/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/ProcessorTopologyTestDriver.java @@ -27,8 +27,8 @@ import org.apache.kafka.common.metrics.Sensor; import org.apache.kafka.common.serialization.ByteArraySerializer; import org.apache.kafka.common.serialization.Deserializer; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.StreamingConfig; -import org.apache.kafka.streams.StreamingMetrics; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.TopologyBuilder; @@ -54,7 +54,7 @@ import java.util.concurrent.atomic.AtomicLong; * can use and test code you already have that uses a builder to create topologies. Best of all, the class works without a real * Kafka broker, so the tests execute very quickly with very little overhead. * <p> - * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamingConfig} and a + * Using the ProcessorTopologyTestDriver in tests is easy: simply instantiate the driver with a {@link StreamsConfig} and a * TopologyBuilder, use the driver to supply an input message to the topology, and then use the driver to read and verify any * messages output by the topology. * <p> @@ -65,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong; * * <h2>Driver setup</h2> * <p> - * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamingConfig}. The + * In order to create a ProcessorTopologyTestDriver instance, you need a TopologyBuilder and a {@link StreamsConfig}. The * configuration needs to be representative of what you'd supply to the real topology, so that means including several key * properties. For example, the following code fragment creates a configuration that specifies a local Kafka broker list * (which is needed but not used), a timestamp extractor, and default serializers and deserializers for string keys and values: @@ -74,13 +74,13 @@ import java.util.concurrent.atomic.AtomicLong; * StringSerializer strSerializer = new StringSerializer(); * StringDeserializer strDeserializer = new StringDeserializer(); * Properties props = new Properties(); - * props.setProperty(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); - * props.setProperty(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); - * props.setProperty(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); - * props.setProperty(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); - * props.setProperty(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); - * props.setProperty(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); - * StreamingConfig config = new StreamingConfig(props); + * props.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9091"); + * props.setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, CustomTimestampExtractor.class.getName()); + * props.setProperty(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); + * props.setProperty(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); + * props.setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, strSerializer.getClass().getName()); + * props.setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, strDeserializer.getClass().getName()); + * StreamsConfig config = new StreamsConfig(props); * TopologyBuilder builder = ... * ProcessorTopologyTestDriver driver = new ProcessorTopologyTestDriver(config, builder); * </pre> @@ -139,11 +139,11 @@ public class ProcessorTopologyTestDriver { /** * Create a new test driver instance. - * @param config the streaming configuration for the topology + * @param config the stream configuration for the topology * @param builder the topology builder that will be used to create the topology instance * @param storeNames the optional names of the state stores that are used by the topology */ - public ProcessorTopologyTestDriver(StreamingConfig config, TopologyBuilder builder, String... storeNames) { + public ProcessorTopologyTestDriver(StreamsConfig config, TopologyBuilder builder, String... storeNames) { id = new TaskId(0, 0); topology = builder.build(null); @@ -173,7 +173,7 @@ public class ProcessorTopologyTestDriver { producer, restoreStateConsumer, config, - new StreamingMetrics() { + new StreamsMetrics() { @Override public Sensor addLatencySensor(String scopeName, String entityName, String operationName, String... tags) { return null; @@ -265,7 +265,7 @@ public class ProcessorTopologyTestDriver { /** * Get the {@link StateStore} with the given name. The name should have been supplied via - * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is + * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is * presumed to be used by a Processor within the topology. * <p> * This is often useful in test cases to pre-populate the store before the test case instructs the topology to @@ -281,7 +281,7 @@ public class ProcessorTopologyTestDriver { /** * Get the {@link KeyValueStore} with the given name. The name should have been supplied via - * {@link #ProcessorTopologyTestDriver(StreamingConfig, TopologyBuilder, String...) this object's constructor}, and is + * {@link #ProcessorTopologyTestDriver(StreamsConfig, TopologyBuilder, String...) this object's constructor}, and is * presumed to be used by a Processor within the topology. * <p> * This is often useful in test cases to pre-populate the store before the test case instructs the topology to
