Repository: kafka Updated Branches: refs/heads/trunk a19729fe6 -> c197113a9
http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java new file mode 100644 index 0000000..60b3b96 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/WallclockTimestampExtractor.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor.internals; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public class WallclockTimestampExtractor implements TimestampExtractor { + @Override + public long extract(ConsumerRecord<Object, Object> record) { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java index 777fae5..b2af904 100644 --- a/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/StreamsConfigTest.java @@ -22,7 +22,7 @@ import org.apache.kafka.common.serialization.IntegerDeserializer; import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.examples.WallclockTimestampExtractor; +import org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor; import org.apache.kafka.streams.processor.internals.StreamThread; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java new file mode 100644 index 0000000..1b8cbb8 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitionerTest.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.clients.producer.internals.DefaultPartitioner; +import org.apache.kafka.common.Cluster; +import org.apache.kafka.common.Node; +import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.Windowed; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.Random; + +import static org.junit.Assert.assertEquals; + +public class WindowedStreamPartitionerTest { + + private String topicName = "topic"; + + private IntegerSerializer keySerializer = new IntegerSerializer(); + private StringSerializer valSerializer = new StringSerializer(); + + private List<PartitionInfo> infos = Arrays.asList( + new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]), + new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) + ); + + private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); + + @Test + public void testCopartitioning() { + + Random rand = new Random(); + + DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); + + WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer); + WindowedStreamPartitioner<Integer, String> streamPartitioner = new WindowedStreamPartitioner<>(windowedSerializer); + + for (int k = 0; k < 10; k++) { + Integer key = rand.nextInt(); + byte[] keyBytes = keySerializer.serialize(topicName, key); + + String value = key.toString(); + byte[] valueBytes = valSerializer.serialize(topicName, value); + + Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); + + for (int w = 0; w < 10; w++) { + HoppingWindow window = new HoppingWindow(10 * w, 20 * w); + + Windowed<Integer> windowedKey = new Windowed<>(key, window); + Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); + + assertEquals(expected, actual); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java deleted file mode 100644 index 18494fd..0000000 --- a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitionerTest.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.clients.producer.internals.DefaultPartitioner; -import org.apache.kafka.common.Cluster; -import org.apache.kafka.common.Node; -import org.apache.kafka.common.PartitionInfo; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.kstream.Windowed; -import org.junit.Test; - -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.Random; - -import static org.junit.Assert.assertEquals; - -public class WindowedStreamsPartitionerTest { - - private String topicName = "topic"; - - private IntegerSerializer keySerializer = new IntegerSerializer(); - private StringSerializer valSerializer = new StringSerializer(); - - private List<PartitionInfo> infos = Arrays.asList( - new PartitionInfo(topicName, 0, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 1, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 2, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 3, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 4, Node.noNode(), new Node[0], new Node[0]), - new PartitionInfo(topicName, 5, Node.noNode(), new Node[0], new Node[0]) - ); - - private Cluster cluster = new Cluster(Arrays.asList(Node.noNode()), infos, Collections.<String>emptySet()); - - @Test - public void testCopartitioning() { - - Random rand = new Random(); - - DefaultPartitioner defaultPartitioner = new DefaultPartitioner(); - - WindowedSerializer<Integer> windowedSerializer = new WindowedSerializer<>(keySerializer); - WindowedStreamsPartitioner<Integer, String> streamPartitioner = new WindowedStreamsPartitioner<>(windowedSerializer); - - for (int k = 0; k < 10; k++) { - Integer key = rand.nextInt(); - byte[] keyBytes = keySerializer.serialize(topicName, key); - - String value = key.toString(); - byte[] valueBytes = valSerializer.serialize(topicName, value); - - Integer expected = defaultPartitioner.partition("topic", key, keyBytes, value, valueBytes, cluster); - - for (int w = 0; w < 10; w++) { - HoppingWindow window = new HoppingWindow(10 * w, 20 * w); - - Windowed<Integer> windowedKey = new Windowed<>(key, window); - Integer actual = streamPartitioner.partition(windowedKey, value, infos.size()); - - assertEquals(expected, actual); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index 60bd309..cb6ea05 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -32,7 +32,7 @@ import org.apache.kafka.streams.processor.AbstractProcessor; import org.apache.kafka.streams.processor.Processor; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.streams.processor.TopologyBuilder; import org.apache.kafka.streams.state.KeyValueIterator; @@ -194,8 +194,8 @@ public class ProcessorTopologyTest { assertNull(driver.readOutput(topic)); } - protected <K, V> StreamsPartitioner<K, V> constantPartitioner(final Integer partition) { - return new StreamsPartitioner<K, V>() { + protected <K, V> StreamPartitioner<K, V> constantPartitioner(final Integer partition) { + return new StreamPartitioner<K, V>() { @Override public Integer partition(K key, V value, int numPartitions) { return partition; http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java index fd604b6..ffcf9ae 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StandbyTaskTest.java @@ -93,8 +93,8 @@ public class StandbyTaskTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.JOB_ID_CONFIG, jobId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamsConfig.JOB_ID_CONFIG, "standby-task-test"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); setProperty(StreamsConfig.STATE_DIR_CONFIG, baseDir.getCanonicalPath()); } @@ -200,7 +200,7 @@ public class StandbyTaskTest { task.close(); - File taskDir = new File(baseDir, taskId.toString()); + File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString()); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); Map<TopicPartition, Long> offsets = checkpoint.read(); @@ -298,7 +298,7 @@ public class StandbyTaskTest { task.close(); - File taskDir = new File(baseDir, taskId.toString()); + File taskDir = new File(StreamThread.makeStateDir(jobId, baseDir.getCanonicalPath()), taskId.toString()); OffsetCheckpoint checkpoint = new OffsetCheckpoint(new File(taskDir, ProcessorStateManager.CHECKPOINT_FILE_NAME)); Map<TopicPartition, Long> offsets = checkpoint.read(); http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java index 2d531bc..039cb96 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java @@ -59,8 +59,9 @@ import java.util.UUID; public class StreamThreadTest { - private String clientId = "clientId"; - private UUID processId = UUID.randomUUID(); + private final String clientId = "clientId"; + private final String jobId = "stream-thread-test"; + private final UUID processId = UUID.randomUUID(); private TopicPartition t1p1 = new TopicPartition("topic1", 1); private TopicPartition t1p2 = new TopicPartition("topic1", 2); @@ -117,8 +118,8 @@ public class StreamThreadTest { setProperty(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); setProperty(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); setProperty(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.test.MockTimestampExtractor"); + setProperty(StreamsConfig.JOB_ID_CONFIG, jobId); setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:2171"); - setProperty(StreamsConfig.JOB_ID_CONFIG, "stream-thread-test"); setProperty(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG, "3"); } }; @@ -128,13 +129,14 @@ public class StreamThreadTest { public boolean committed = false; public TestStreamTask(TaskId id, + String jobId, Collection<TopicPartition> partitions, ProcessorTopology topology, Consumer<byte[], byte[]> consumer, Producer<byte[], byte[]> producer, Consumer<byte[], byte[]> restoreConsumer, StreamsConfig config) { - super(id, "jobId", partitions, topology, consumer, producer, restoreConsumer, config, null); + super(id, jobId, partitions, topology, consumer, producer, restoreConsumer, config, null); } @Override @@ -161,11 +163,11 @@ public class StreamThreadTest { builder.addSource("source3", "topic3"); builder.addProcessor("processor", new MockProcessorSupplier(), "source2", "source3"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), new SystemTime()) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), new SystemTime()) { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -264,10 +266,12 @@ public class StreamThreadTest { StreamsConfig config = new StreamsConfig(props); - File stateDir1 = new File(baseDir, task1.toString()); - File stateDir2 = new File(baseDir, task2.toString()); - File stateDir3 = new File(baseDir, task3.toString()); - File extraDir = new File(baseDir, "X"); + File jobDir = new File(baseDir, jobId); + jobDir.mkdir(); + File stateDir1 = new File(jobDir, task1.toString()); + File stateDir2 = new File(jobDir, task2.toString()); + File stateDir3 = new File(jobDir, task3.toString()); + File extraDir = new File(jobDir, "X"); stateDir1.mkdir(); stateDir2.mkdir(); stateDir3.mkdir(); @@ -281,7 +285,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) { @Override public void maybeClean() { super.maybeClean(); @@ -290,7 +294,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; @@ -403,7 +407,7 @@ public class StreamThreadTest { TopologyBuilder builder = new TopologyBuilder(); builder.addSource("source1", "topic1"); - StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, "test", clientId, processId, new Metrics(), mockTime) { + StreamThread thread = new StreamThread(builder, config, producer, consumer, mockRestoreConsumer, jobId, clientId, processId, new Metrics(), mockTime) { @Override public void maybeCommit() { super.maybeCommit(); @@ -412,7 +416,7 @@ public class StreamThreadTest { @Override protected StreamTask createStreamTask(TaskId id, Collection<TopicPartition> partitionsForTask) { ProcessorTopology topology = builder.build(id.topicGroupId); - return new TestStreamTask(id, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); + return new TestStreamTask(id, jobId, partitionsForTask, topology, consumer, producer, mockRestoreConsumer, config); } }; http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java index 36e487b..1e9c3ba 100644 --- a/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/streams/state/KeyValueStoreTestDriver.java @@ -29,7 +29,7 @@ import org.apache.kafka.streams.StreamsMetrics; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateRestoreCallback; import org.apache.kafka.streams.processor.StateStore; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.TaskId; import org.apache.kafka.streams.processor.internals.RecordCollector; import org.apache.kafka.test.MockProcessorContext; @@ -249,7 +249,7 @@ public class KeyValueStoreTestDriver<K, V> { } @Override public <K1, V1> void send(ProducerRecord<K1, V1> record, Serializer<K1> keySerializer, Serializer<V1> valueSerializer, - StreamsPartitioner<K1, V1> partitioner) { + StreamPartitioner<K1, V1> partitioner) { recordFlushed(record.key(), record.value()); } }; http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java index 8f8e00f..2dc567e 100644 --- a/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java +++ b/streams/src/test/java/org/apache/kafka/test/KStreamTestDriver.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.kstream.KStreamBuilder; import org.apache.kafka.streams.processor.ProcessorContext; import org.apache.kafka.streams.processor.StateStore; import org.apache.kafka.streams.processor.StateStoreSupplier; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.processor.internals.ProcessorNode; import org.apache.kafka.streams.processor.internals.ProcessorTopology; import org.apache.kafka.streams.processor.internals.RecordCollector; @@ -130,7 +130,7 @@ public class KStreamTestDriver { @Override public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, - StreamsPartitioner<K, V> partitioner) { + StreamPartitioner<K, V> partitioner) { // The serialization is skipped. process(record.topic(), record.key(), record.value()); }
