KIP-28: Add a processor client for Kafka Streaming This work has been contributed by Jesse Anderson, Randall Hauch, Yasuhiro Matsuda and Guozhang Wang. The detailed design can be found in https://cwiki.apache.org/confluence/display/KAFKA/KIP-28+-+Add+a+processor+client.
Author: Guozhang Wang <[email protected]> Author: Yasuhiro Matsuda <[email protected]> Author: Yasuhiro Matsuda <[email protected]> Author: ymatsuda <[email protected]> Author: Randall Hauch <[email protected]> Author: Jesse Anderson <[email protected]> Author: Ismael Juma <[email protected]> Author: Jesse Anderson <[email protected]> Reviewers: Ismael Juma, Randall Hauch, Edward Ribeiro, Gwen Shapira, Jun Rao, Jay Kreps, Yasuhiro Matsuda, Guozhang Wang Closes #130 from guozhangwang/streaming Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/263c10ab Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/263c10ab Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/263c10ab Branch: refs/heads/trunk Commit: 263c10ab7c8e8fde9d3566bf59dccaa454ee2605 Parents: ad120d5 Author: Guozhang Wang <[email protected]> Authored: Fri Sep 25 17:27:58 2015 -0700 Committer: Guozhang Wang <[email protected]> Committed: Fri Sep 25 17:27:58 2015 -0700 ---------------------------------------------------------------------- bin/kafka-run-class.sh | 5 + build.gradle | 76 ++- checkstyle/checkstyle.xml | 8 +- checkstyle/import-control.xml | 16 +- .../kafka/clients/consumer/ConsumerConfig.java | 4 +- .../kafka/clients/consumer/KafkaConsumer.java | 2 + .../kafka/clients/consumer/MockConsumer.java | 18 +- .../kafka/clients/producer/KafkaProducer.java | 2 + .../kafka/clients/producer/ProducerConfig.java | 4 +- .../kafka/common/config/AbstractConfig.java | 4 + .../common/serialization/LongDeserializer.java | 44 ++ .../common/serialization/LongSerializer.java | 42 ++ .../org/apache/kafka/common/utils/Utils.java | 41 ++ settings.gradle | 4 +- .../apache/kafka/streams/KafkaStreaming.java | 125 +++++ .../apache/kafka/streams/StreamingConfig.java | 201 ++++++++ .../kafka/streams/examples/KStreamJob.java | 84 ++++ .../kafka/streams/examples/ProcessorJob.java | 112 +++++ .../examples/WallclockTimestampExtractor.java | 28 ++ .../apache/kafka/streams/kstream/KStream.java | 156 ++++++ .../kafka/streams/kstream/KStreamBuilder.java | 65 +++ .../kafka/streams/kstream/KStreamWindowed.java | 38 ++ .../apache/kafka/streams/kstream/KeyValue.java | 34 ++ .../kafka/streams/kstream/KeyValueMapper.java | 23 + .../apache/kafka/streams/kstream/Predicate.java | 24 + .../kafka/streams/kstream/SlidingWindowDef.java | 265 +++++++++++ .../kafka/streams/kstream/ValueJoiner.java | 23 + .../kafka/streams/kstream/ValueMapper.java | 23 + .../apache/kafka/streams/kstream/Window.java | 36 ++ .../apache/kafka/streams/kstream/WindowDef.java | 25 + .../kstream/internals/FilteredIterator.java | 63 +++ .../kstream/internals/KStreamBranch.java | 52 ++ .../kstream/internals/KStreamFilter.java | 48 ++ .../kstream/internals/KStreamFlatMap.java | 47 ++ .../kstream/internals/KStreamFlatMapValues.java | 47 ++ .../streams/kstream/internals/KStreamImpl.java | 201 ++++++++ .../streams/kstream/internals/KStreamJoin.java | 96 ++++ .../streams/kstream/internals/KStreamMap.java | 46 ++ .../kstream/internals/KStreamMapValues.java | 45 ++ .../kstream/internals/KStreamPassThrough.java | 37 ++ .../kstream/internals/KStreamWindow.java | 68 +++ .../kstream/internals/KStreamWindowedImpl.java | 54 +++ .../kstream/internals/WindowSupport.java | 159 +++++++ .../streams/processor/AbstractProcessor.java | 71 +++ .../kafka/streams/processor/Processor.java | 59 +++ .../streams/processor/ProcessorContext.java | 106 +++++ .../kafka/streams/processor/ProcessorDef.java | 23 + .../kafka/streams/processor/RestoreFunc.java | 27 ++ .../kafka/streams/processor/StateStore.java | 52 ++ .../streams/processor/TimestampExtractor.java | 34 ++ .../streams/processor/TopologyBuilder.java | 293 ++++++++++++ .../streams/processor/TopologyException.java | 38 ++ .../internals/MinTimestampTracker.java | 67 +++ .../processor/internals/PartitionGroup.java | 165 +++++++ .../internals/ProcessorContextImpl.java | 214 +++++++++ .../processor/internals/ProcessorNode.java | 70 +++ .../internals/ProcessorStateManager.java | 232 +++++++++ .../processor/internals/ProcessorTopology.java | 53 +++ .../processor/internals/PunctuationQueue.java | 56 +++ .../internals/PunctuationSchedule.java | 41 ++ .../streams/processor/internals/Punctuator.java | 24 + .../processor/internals/RecordCollector.java | 80 ++++ .../processor/internals/RecordQueue.java | 140 ++++++ .../streams/processor/internals/SinkNode.java | 64 +++ .../streams/processor/internals/SourceNode.java | 64 +++ .../streams/processor/internals/Stamped.java | 38 ++ .../processor/internals/StampedRecord.java | 52 ++ .../streams/processor/internals/StreamTask.java | 352 ++++++++++++++ .../processor/internals/StreamThread.java | 477 +++++++++++++++++++ .../processor/internals/TimestampTracker.java | 58 +++ .../org/apache/kafka/streams/state/Entry.java | 42 ++ .../streams/state/InMemoryKeyValueStore.java | 145 ++++++ .../kafka/streams/state/KeyValueIterator.java | 29 ++ .../kafka/streams/state/KeyValueStore.java | 86 ++++ .../streams/state/MeteredKeyValueStore.java | 273 +++++++++++ .../kafka/streams/state/OffsetCheckpoint.java | 172 +++++++ .../streams/state/RocksDBKeyValueStore.java | 276 +++++++++++ .../streams/kstream/KStreamBuilderTest.java | 34 ++ .../kstream/internals/FilteredIteratorTest.java | 94 ++++ .../kstream/internals/KStreamBranchTest.java | 90 ++++ .../kstream/internals/KStreamFilterTest.java | 85 ++++ .../kstream/internals/KStreamFlatMapTest.java | 80 ++++ .../internals/KStreamFlatMapValuesTest.java | 77 +++ .../kstream/internals/KStreamImplTest.java | 138 ++++++ .../kstream/internals/KStreamJoinTest.java | 164 +++++++ .../kstream/internals/KStreamMapTest.java | 73 +++ .../kstream/internals/KStreamMapValuesTest.java | 71 +++ .../kstream/internals/KStreamWindowedTest.java | 91 ++++ .../streams/processor/TopologyBuilderTest.java | 99 ++++ .../internals/MinTimestampTrackerTest.java | 93 ++++ .../processor/internals/PartitionGroupTest.java | 102 ++++ .../internals/ProcessorStateManagerTest.java | 449 +++++++++++++++++ .../internals/ProcessorTopologyTest.java | 326 +++++++++++++ .../internals/PunctuationQueueTest.java | 85 ++++ .../processor/internals/RecordQueueTest.java | 116 +++++ .../processor/internals/StreamTaskTest.java | 186 ++++++++ .../processor/internals/StreamThreadTest.java | 389 +++++++++++++++ .../apache/kafka/test/KStreamTestDriver.java | 95 ++++ .../apache/kafka/test/MockProcessorContext.java | 143 ++++++ .../org/apache/kafka/test/MockProcessorDef.java | 58 +++ .../org/apache/kafka/test/MockSourceNode.java | 46 ++ .../kafka/test/MockTimestampExtractor.java | 30 ++ .../kafka/test/ProcessorTopologyTestDriver.java | 317 ++++++++++++ .../apache/kafka/test/UnlimitedWindowDef.java | 104 ++++ 104 files changed, 10083 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/bin/kafka-run-class.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index dd37df4..d8a111e 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -56,6 +56,11 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/stream/build/libs/kafka-streams*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + for file in $base_dir/tools/build/libs/kafka-tools*.jar; do CLASSPATH=$CLASSPATH:$file http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index fecc3eb..02b1db5 100644 --- a/build.gradle +++ b/build.gradle @@ -215,25 +215,22 @@ for ( sv in ['2_10_5', '2_11_7'] ) { } def copycatPkgs = ['copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file'] -def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools'] + copycatPkgs +def pkgs = ['clients', 'examples', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'log4j-appender', 'tools', 'streams'] + copycatPkgs tasks.create(name: "jarCopycat", dependsOn: copycatPkgs.collect { it + ":jar" }) {} -tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { -} +tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10_5', 'jar_core_2_11_7'] + pkgs.collect { it + ":jar" }) { } tasks.create(name: "srcJarAll", dependsOn: ['srcJar_2_10_5', 'srcJar_2_11_7'] + pkgs.collect { it + ":srcJar" }) { } tasks.create(name: "docsJarAll", dependsOn: ['docsJar_2_10_5', 'docsJar_2_11_7'] + pkgs.collect { it + ":docsJar" }) { } tasks.create(name: "testCopycat", dependsOn: copycatPkgs.collect { it + ":test" }) {} -tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { -} +tasks.create(name: "testAll", dependsOn: ['test_core_2_10_5', 'test_core_2_11_7'] + pkgs.collect { it + ":test" }) { } tasks.create(name: "releaseTarGzAll", dependsOn: ['releaseTarGz_2_10_5', 'releaseTarGz_2_11_7']) { } -tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) { -} +tasks.create(name: "uploadArchivesAll", dependsOn: ['uploadCoreArchives_2_10_5', 'uploadCoreArchives_2_11_7'] + pkgs.collect { it + ":uploadArchives" }) { } project(':core') { println "Building project 'core' with Scala version $scalaVersion" @@ -518,6 +515,71 @@ project(':tools') { dependsOn 'copyDependantLibs' } + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom (testCompile) + } + + checkstyle { + configFile = new File(rootDir, "checkstyle/checkstyle.xml") + } + test.dependsOn('checkstyleMain', 'checkstyleTest') +} + +project(':streams') { + apply plugin: 'checkstyle' + archivesBaseName = "kafka-streams" + + dependencies { + compile project(':clients') + compile "$slf4jlog4j" + compile 'org.rocksdb:rocksdbjni:3.10.1' + + testCompile "$junit" + testCompile project(path: ':clients', configuration: 'archives') + } + + task testJar(type: Jar) { + classifier = 'test' + from sourceSets.test.output + } + + test { + testLogging { + events "passed", "skipped", "failed" + exceptionFormat = 'full' + } + } + + javadoc { + include "**/org/apache/kafka/streams/*" + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.testRuntime) { + include('slf4j-log4j12*') + } + from (configurations.runtime) { + exclude('kafka-clients*') + } + into "$buildDir/dependant-libs-${scalaVersion}" + } + + jar { + dependsOn 'copyDependantLibs' + } + + artifacts { + archives testJar + } + + configurations { + archives.extendsFrom (testCompile) + } + checkstyle { configFile = new File(rootDir, "checkstyle/checkstyle.xml") } http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/checkstyle/checkstyle.xml ---------------------------------------------------------------------- diff --git a/checkstyle/checkstyle.xml b/checkstyle/checkstyle.xml index a215ff3..999fd6c 100644 --- a/checkstyle/checkstyle.xml +++ b/checkstyle/checkstyle.xml @@ -53,9 +53,13 @@ </module> <module name="LocalVariableName"/> <module name="LocalFinalVariableName"/> - <module name="ClassTypeParameterName"/> <module name="MemberName"/> - <module name="MethodTypeParameterName"/> + <module name="ClassTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> + <module name="MethodTypeParameterName"> + <property name="format" value="^[A-Z0-9]*$"/> + </module> <module name="PackageName"/> <module name="ParameterName"/> <module name="StaticVariableName"/> http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index d58c472..7b748ec 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -90,8 +90,8 @@ </subpackage> <subpackage name="clients"> - <allow pkg="org.apache.kafka.common" /> <allow pkg="org.slf4j" /> + <allow pkg="org.apache.kafka.common" /> <allow pkg="org.apache.kafka.clients" exact-match="true"/> <allow pkg="org.apache.kafka.test" /> @@ -111,6 +111,20 @@ </subpackage> </subpackage> + <subpackage name="streams"> + <allow pkg="org.apache.kafka.common"/> + <allow pkg="org.apache.kafka.test"/> + <allow pkg="org.apache.kafka.clients"/> + <allow pkg="org.apache.kafka.clients.producer" exact-match="true"/> + <allow pkg="org.apache.kafka.clients.consumer" exact-match="true"/> + + <allow pkg="org.apache.kafka.streams"/> + + <subpackage name="state"> + <allow pkg="org.rocksdb" /> + </subpackage> + </subpackage> + <subpackage name="log4jappender"> <allow pkg="org.apache.log4j" /> <allow pkg="org.apache.kafka.clients" /> http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java index b9a2d4e..347a5bc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/ConsumerConfig.java @@ -150,11 +150,11 @@ public class ConsumerConfig extends AbstractConfig { /** <code>key.deserializer</code> */ public static final String KEY_DESERIALIZER_CLASS_CONFIG = "key.deserializer"; - private static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface."; + public static final String KEY_DESERIALIZER_CLASS_DOC = "Deserializer class for key that implements the <code>Deserializer</code> interface."; /** <code>value.deserializer</code> */ public static final String VALUE_DESERIALIZER_CLASS_CONFIG = "value.deserializer"; - private static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface."; + public static final String VALUE_DESERIALIZER_CLASS_DOC = "Deserializer class for value that implements the <code>Deserializer</code> interface."; /** <code>connections.max.idle.ms</code> */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 2a3c763..68f61bf 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -550,6 +550,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { Deserializer.class); this.keyDeserializer.configure(config.originals(), true); } else { + config.ignore(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG); this.keyDeserializer = keyDeserializer; } if (valueDeserializer == null) { @@ -557,6 +558,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> { Deserializer.class); this.valueDeserializer.configure(config.originals(), false); } else { + config.ignore(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG); this.valueDeserializer = valueDeserializer; } this.fetcher = new Fetcher<K, V>(this.client, http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 1f802a8..3c0f261 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -46,6 +46,7 @@ public class MockConsumer<K, V> implements Consumer<K, V> { private final Map<String, List<PartitionInfo>> partitions; private final SubscriptionState subscriptions; private Map<TopicPartition, List<ConsumerRecord<K, V>>> records; + private Set<TopicPartition> paused; private boolean closed; private final Map<TopicPartition, Long> beginningOffsets; private final Map<TopicPartition, Long> endOffsets; @@ -57,8 +58,9 @@ public class MockConsumer<K, V> implements Consumer<K, V> { public MockConsumer(OffsetResetStrategy offsetResetStrategy) { this.subscriptions = new SubscriptionState(offsetResetStrategy); - this.partitions = new HashMap<String, List<PartitionInfo>>(); - this.records = new HashMap<TopicPartition, List<ConsumerRecord<K, V>>>(); + this.partitions = new HashMap<>(); + this.records = new HashMap<>(); + this.paused = new HashSet<>(); this.closed = false; this.beginningOffsets = new HashMap<>(); this.endOffsets = new HashMap<>(); @@ -288,14 +290,18 @@ public class MockConsumer<K, V> implements Consumer<K, V> { @Override public void pause(TopicPartition... partitions) { - for (TopicPartition partition : partitions) + for (TopicPartition partition : partitions) { subscriptions.pause(partition); + paused.add(partition); + } } @Override public void resume(TopicPartition... partitions) { - for (TopicPartition partition : partitions) + for (TopicPartition partition : partitions) { subscriptions.resume(partition); + paused.remove(partition); + } } @Override @@ -332,6 +338,10 @@ public class MockConsumer<K, V> implements Consumer<K, V> { } } + public Set<TopicPartition> paused() { + return Collections.unmodifiableSet(new HashSet<>(paused)); + } + private void ensureNotClosed() { if (this.closed) throw new IllegalStateException("This consumer has already been closed."); http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java index 804d569..3a783ec 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java @@ -259,6 +259,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { Serializer.class); this.keySerializer.configure(config.originals(), true); } else { + config.ignore(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG); this.keySerializer = keySerializer; } if (valueSerializer == null) { @@ -266,6 +267,7 @@ public class KafkaProducer<K, V> implements Producer<K, V> { Serializer.class); this.valueSerializer.configure(config.originals(), false); } else { + config.ignore(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG); this.valueSerializer = valueSerializer; } config.logUnused(); http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java index 06f00a9..6969f61 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java @@ -164,11 +164,11 @@ public class ProducerConfig extends AbstractConfig { /** <code>key.serializer</code> */ public static final String KEY_SERIALIZER_CLASS_CONFIG = "key.serializer"; - private static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface."; + public static final String KEY_SERIALIZER_CLASS_DOC = "Serializer class for key that implements the <code>Serializer</code> interface."; /** <code>value.serializer</code> */ public static final String VALUE_SERIALIZER_CLASS_CONFIG = "value.serializer"; - private static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface."; + public static final String VALUE_SERIALIZER_CLASS_DOC = "Serializer class for value that implements the <code>Serializer</code> interface."; /** <code>connections.max.idle.ms</code> */ public static final String CONNECTIONS_MAX_IDLE_MS_CONFIG = CommonClientConfigs.CONNECTIONS_MAX_IDLE_MS_CONFIG; http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java index 12a1927..2961e09 100644 --- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java +++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java @@ -62,6 +62,10 @@ public class AbstractConfig { return values.get(key); } + public void ignore(String key) { + used.add(key); + } + public Short getShort(String key) { return (Short) get(key); } http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java new file mode 100644 index 0000000..37983e4 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongDeserializer.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import org.apache.kafka.common.errors.SerializationException; + +import java.util.Map; + +public class LongDeserializer implements Deserializer<Long> { + + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + public Long deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 8) { + throw new SerializationException("Size of data received by LongDeserializer is " + + "not 8"); + } + + long value = 0; + for (byte b : data) { + value <<= 8; + value |= b & 0xFF; + } + return value; + } + + public void close() { + // nothing to do + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java new file mode 100644 index 0000000..3100529 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/LongSerializer.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE + * file distributed with this work for additional information regarding copyright ownership. The ASF licenses this file + * to You under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the + * License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class LongSerializer implements Serializer<Long> { + + public void configure(Map<String, ?> configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Long data) { + if (data == null) + return null; + + return new byte[] { + (byte) (data >>> 56), + (byte) (data >>> 48), + (byte) (data >>> 40), + (byte) (data >>> 32), + (byte) (data >>> 24), + (byte) (data >>> 16), + (byte) (data >>> 8), + data.byteValue() + }; + } + + public void close() { + // nothing to do + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index c58b741..fa7c92f 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -25,7 +25,10 @@ import java.nio.ByteBuffer; import java.nio.MappedByteBuffer; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; import java.util.Iterator; +import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.Properties; @@ -524,4 +527,42 @@ public class Utils { return existingBuffer; } + /* + * Creates a set + * @param elems the elements + * @param <T> the type of element + * @return Set + */ + public static <T> HashSet<T> mkSet(T... elems) { + return new HashSet<>(Arrays.asList(elems)); + } + + /** + * Recursively delete the given file/directory and any subfiles (if any exist) + * + * @param file The root file at which to begin deleting + */ + public static void delete(File file) { + if (file == null) { + return; + } else if (file.isDirectory()) { + File[] files = file.listFiles(); + if (files != null) { + for (File f : files) + delete(f); + } + file.delete(); + } else { + file.delete(); + } + } + + /** + * Returns an empty collection if this list is null + * @param other + * @return + */ + public static <T> List<T> safe(List<T> other) { + return other == null ? Collections.<T>emptyList() : other; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 9c7fea5..357305b 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,5 +14,5 @@ // limitations under the License. apply from: file('scala.gradle') -include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'log4j-appender', - 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file' +include 'core', 'contrib:hadoop-consumer', 'contrib:hadoop-producer', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', + 'copycat:api', 'copycat:runtime', 'copycat:json', 'copycat:file' \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java new file mode 100644 index 0000000..f3a99e0 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreaming.java @@ -0,0 +1,125 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.internals.StreamThread; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Kafka Streaming allows for performing continuous computation on input coming from one or more input topics and + * sends output to zero or more output topics. + * <p> + * This processing is defined by using the {@link TopologyBuilder} class or its superclass KStreamBuilder to specify + * the transformation. + * The {@link KafkaStreaming} instance will be responsible for the lifecycle of these processors. It will instantiate and + * start one or more of these processors to process the Kafka partitions assigned to this particular instance. + * <p> + * This streaming instance will co-ordinate with any other instances (whether in this same process, on other processes + * on this machine, or on remote machines). These processes will divide up the work so that all partitions are being + * consumed. If instances are added or die, the corresponding {@link StreamThread} instances will be shutdown or + * started in the appropriate processes to balance processing load. + * <p> + * Internally the {@link KafkaStreaming} instance contains a normal {@link org.apache.kafka.clients.producer.KafkaProducer KafkaProducer} + * and {@link org.apache.kafka.clients.consumer.KafkaConsumer KafkaConsumer} instance that is used for reading input and writing output. + * <p> + * A simple example might look like this: + * <pre> + * Map<String, Object> props = new HashMap<>(); + * props.put("bootstrap.servers", "localhost:4242"); + * props.put("key.deserializer", StringDeserializer.class); + * props.put("value.deserializer", StringDeserializer.class); + * props.put("key.serializer", StringSerializer.class); + * props.put("value.serializer", IntegerSerializer.class); + * props.put("timestamp.extractor", MyTimestampExtractor.class); + * StreamingConfig config = new StreamingConfig(props); + * + * KStreamBuilder builder = new KStreamBuilder(); + * builder.from("topic1").mapValue(value -> value.length()).to("topic2"); + * + * KafkaStreaming streaming = new KafkaStreaming(builder, config); + * streaming.start(); + * </pre> + * + */ +public class KafkaStreaming { + + private static final Logger log = LoggerFactory.getLogger(KafkaStreaming.class); + + // Container States + private static final int CREATED = 0; + private static final int RUNNING = 1; + private static final int STOPPED = 2; + private int state = CREATED; + + private final StreamThread[] threads; + + public KafkaStreaming(TopologyBuilder builder, StreamingConfig config) throws Exception { + this.threads = new StreamThread[config.getInt(StreamingConfig.NUM_STREAM_THREADS_CONFIG)]; + for (int i = 0; i < this.threads.length; i++) { + this.threads[i] = new StreamThread(builder, config); + } + } + + /** + * Start the stream process by starting all its threads + */ + public synchronized void start() { + log.debug("Starting Kafka Stream process"); + + if (state == CREATED) { + for (StreamThread thread : threads) + thread.start(); + + state = RUNNING; + + log.info("Started Kafka Stream process"); + } else { + throw new IllegalStateException("This process was already started."); + } + } + + /** + * Shutdown this stream process by signaling the threads to stop, + * wait for them to join and clean up the process instance. + */ + public synchronized void close() { + log.debug("Stopping Kafka Stream process"); + + if (state == RUNNING) { + // signal the threads to stop and wait + for (StreamThread thread : threads) + thread.close(); + + for (StreamThread thread : threads) { + try { + thread.join(); + } catch (InterruptedException ex) { + Thread.interrupted(); + } + } + + state = STOPPED; + + log.info("Stopped Kafka Stream process"); + } else { + throw new IllegalStateException("This process has not started yet."); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java new file mode 100644 index 0000000..dce69b6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/StreamingConfig.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.Map; + +public class StreamingConfig extends AbstractConfig { + + private static final ConfigDef CONFIG; + + /** <code>state.dir</code> */ + public static final String STATE_DIR_CONFIG = "state.dir"; + private static final String STATE_DIR_DOC = "Directory location for state store."; + + /** <code>commit.interval.ms</code> */ + public static final String COMMIT_INTERVAL_MS_CONFIG = "commit.interval.ms"; + private static final String COMMIT_INTERVAL_MS_DOC = "The frequency with which to save the position of the processor."; + + /** <code>poll.ms</code> */ + public static final String POLL_MS_CONFIG = "poll.ms"; + private static final String POLL_MS_DOC = "The amount of time in milliseconds to block waiting for input."; + + /** <code>num.stream.threads</code> */ + public static final String NUM_STREAM_THREADS_CONFIG = "num.stream.threads"; + private static final String NUM_STREAM_THREADS_DOC = "The number of threads to execute stream processing."; + + /** <code>buffered.records.per.partition</code> */ + public static final String BUFFERED_RECORDS_PER_PARTITION_CONFIG = "buffered.records.per.partition"; + private static final String BUFFERED_RECORDS_PER_PARTITION_DOC = "The maximum number of records to buffer per partition."; + + /** <code>state.cleanup.delay</code> */ + public static final String STATE_CLEANUP_DELAY_MS_CONFIG = "state.cleanup.delay.ms"; + private static final String STATE_CLEANUP_DELAY_MS_DOC = "The amount of time in milliseconds to wait before deleting state when a partition has migrated."; + + /** <code>total.records.to.process</code> */ + public static final String TOTAL_RECORDS_TO_PROCESS = "total.records.to.process"; + private static final String TOTAL_RECORDS_TO_DOC = "Exit after processing this many records."; + + /** <code>window.time.ms</code> */ + public static final String WINDOW_TIME_MS_CONFIG = "window.time.ms"; + private static final String WINDOW_TIME_MS_DOC = "Setting this to a non-negative value will cause the processor to get called " + + "with this frequency even if there is no message."; + + /** <code>timestamp.extractor</code> */ + public static final String TIMESTAMP_EXTRACTOR_CLASS_CONFIG = "timestamp.extractor"; + private static final String TIMESTAMP_EXTRACTOR_CLASS_DOC = "Timestamp extractor class that implements the <code>TimestampExtractor</code> interface."; + + /** <code>client.id</code> */ + public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; + + /** <code>key.serializer</code> */ + public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; + + /** <code>value.serializer</code> */ + public static final String VALUE_SERIALIZER_CLASS_CONFIG = ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG; + + /** <code>key.deserializer</code> */ + public static final String KEY_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG; + + /** <code>value.deserializer</code> */ + public static final String VALUE_DESERIALIZER_CLASS_CONFIG = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG; + + /** + * <code>bootstrap.servers</code> + */ + public static final String BOOTSTRAP_SERVERS_CONFIG = CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG; + + private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir"); + + static { + CONFIG = new ConfigDef().define(CLIENT_ID_CONFIG, + Type.STRING, + "", + Importance.MEDIUM, + CommonClientConfigs.CLIENT_ID_DOC) + .define(STATE_DIR_CONFIG, + Type.STRING, + SYSTEM_TEMP_DIRECTORY, + Importance.MEDIUM, + STATE_DIR_DOC) + .define(COMMIT_INTERVAL_MS_CONFIG, + Type.LONG, + 30000, + Importance.HIGH, + COMMIT_INTERVAL_MS_DOC) + .define(POLL_MS_CONFIG, + Type.LONG, + 100, + Importance.LOW, + POLL_MS_DOC) + .define(NUM_STREAM_THREADS_CONFIG, + Type.INT, + 1, + Importance.LOW, + NUM_STREAM_THREADS_DOC) + .define(BUFFERED_RECORDS_PER_PARTITION_CONFIG, + Type.INT, + 1000, + Importance.LOW, + BUFFERED_RECORDS_PER_PARTITION_DOC) + .define(STATE_CLEANUP_DELAY_MS_CONFIG, + Type.LONG, + 60000, + Importance.LOW, + STATE_CLEANUP_DELAY_MS_DOC) + .define(TOTAL_RECORDS_TO_PROCESS, + Type.LONG, + -1L, + Importance.LOW, + TOTAL_RECORDS_TO_DOC) + .define(WINDOW_TIME_MS_CONFIG, + Type.LONG, + -1L, + Importance.MEDIUM, + WINDOW_TIME_MS_DOC) + .define(KEY_SERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + ProducerConfig.KEY_SERIALIZER_CLASS_DOC) + .define(VALUE_SERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + ProducerConfig.VALUE_SERIALIZER_CLASS_DOC) + .define(KEY_DESERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + ConsumerConfig.KEY_DESERIALIZER_CLASS_DOC) + .define(VALUE_DESERIALIZER_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_DOC) + .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, + Type.CLASS, + Importance.HIGH, + TIMESTAMP_EXTRACTOR_CLASS_DOC) + .define(BOOTSTRAP_SERVERS_CONFIG, + Type.STRING, + Importance.HIGH, + CommonClientConfigs.BOOSTRAP_SERVERS_DOC); + } + + public StreamingConfig(Map<?, ?> props) { + super(CONFIG, props); + } + + public Map<String, Object> getConsumerConfigs() { + Map<String, Object> props = this.originals(); + + // set consumer default property values + props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); + props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, "range"); + + // remove properties that are not required for consumers + props.remove(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG); + props.remove(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG); + props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + + return props; + } + + public Map<String, Object> getProducerConfigs() { + Map<String, Object> props = this.originals(); + + // set producer default property values + props.put(ProducerConfig.LINGER_MS_CONFIG, "100"); + + // remove properties that are not required for producers + props.remove(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG); + props.remove(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG); + props.remove(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG); + + return props; + } + + public static void main(String[] args) { + System.out.println(CONFIG.toHtmlTable()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java new file mode 100644 index 0000000..feb4ee7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.KafkaStreaming; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValue; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Predicate; + +import java.util.Properties; + +public class KStreamJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-KStream-Job"); + props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); + StreamingConfig config = new StreamingConfig(props); + + KStreamBuilder builder = new KStreamBuilder(); + + KStream<String, String> stream1 = builder.from("topic1"); + + KStream<String, Integer> stream2 = + stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() { + @Override + public KeyValue<String, Integer> apply(String key, String value) { + return new KeyValue<>(key, new Integer(value)); + } + }).filter(new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return true; + } + }); + + KStream<String, Integer>[] streams = stream2.branch( + new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return (value % 2) == 0; + } + }, + new Predicate<String, Integer>() { + @Override + public boolean apply(String key, Integer value) { + return true; + } + } + ); + + streams[0].to("topic2"); + streams[1].to("topic3"); + + KafkaStreaming kstream = new KafkaStreaming(builder, config); + kstream.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java new file mode 100644 index 0000000..0b3aba8 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreaming; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorDef; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.StreamingConfig; +import org.apache.kafka.streams.state.Entry; +import org.apache.kafka.streams.state.InMemoryKeyValueStore; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; + +import java.util.Properties; + +public class ProcessorJob { + + private static class MyProcessorDef implements ProcessorDef { + + @Override + public Processor<String, String> instance() { + return new Processor<String, String>() { + private ProcessorContext context; + private KeyValueStore<String, Integer> kvStore; + + @Override + public void init(ProcessorContext context) { + this.context = context; + this.context.schedule(1000); + this.kvStore = new InMemoryKeyValueStore<>("local-state", context); + } + + @Override + public void process(String key, String value) { + Integer oldValue = this.kvStore.get(key); + Integer newValue = Integer.parseInt(value); + if (oldValue == null) { + this.kvStore.put(key, newValue); + } else { + this.kvStore.put(key, oldValue + newValue); + } + + context.commit(); + } + + @Override + public void punctuate(long timestamp) { + KeyValueIterator<String, Integer> iter = this.kvStore.all(); + + while (iter.hasNext()) { + Entry<String, Integer> entry = iter.next(); + + System.out.println("[" + entry.key() + ", " + entry.value() + "]"); + + context.forward(entry.key(), entry.value()); + } + + iter.close(); + } + + @Override + public void close() { + this.kvStore.close(); + } + }; + } + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamingConfig.CLIENT_ID_CONFIG, "Example-Processor-Job"); + props.put(StreamingConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamingConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamingConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + props.put(StreamingConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamingConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + props.put(StreamingConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); + StreamingConfig config = new StreamingConfig(props); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); + + builder.addProcessor("PROCESS", new MyProcessorDef(), "SOURCE"); + + builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); + + KafkaStreaming streaming = new KafkaStreaming(builder, config); + streaming.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java new file mode 100644 index 0000000..26281d6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java @@ -0,0 +1,28 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +public class WallclockTimestampExtractor implements TimestampExtractor { + @Override + public long extract(ConsumerRecord<Object, Object> record) { + return System.currentTimeMillis(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java new file mode 100644 index 0000000..7f101ab --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.processor.ProcessorDef; + +/** + * KStream is an abstraction of a stream of key-value pairs. + */ +public interface KStream<K, V> { + + /** + * Creates a new stream consists of all elements of this stream which satisfy a predicate + * + * @param predicate the instance of Predicate + * @return KStream + */ + KStream<K, V> filter(Predicate<K, V> predicate); + + /** + * Creates a new stream consists all elements of this stream which do not satisfy a predicate + * + * @param predicate the instance of Predicate + * @return KStream + */ + KStream<K, V> filterOut(Predicate<K, V> predicate); + + /** + * Creates a new stream by transforming key-value pairs by a mapper to all elements of this stream + * + * @param mapper the instance of KeyValueMapper + * @param <K1> the key type of the new stream + * @param <V1> the value type of the new stream + * @return KStream + */ + <K1, V1> KStream<K1, V1> map(KeyValueMapper<K, V, KeyValue<K1, V1>> mapper); + + /** + * Creates a new stream by transforming values by a mapper to all values of this stream + * + * @param mapper the instance of ValueMapper + * @param <V1> the value type of the new stream + * @return KStream + */ + <V1> KStream<K, V1> mapValues(ValueMapper<V, V1> mapper); + + /** + * Creates a new stream by applying a mapper to all elements of this stream and using the values in the resulting Iterable + * + * @param mapper the instance of KeyValueMapper + * @param <K1> the key type of the new stream + * @param <V1> the value type of the new stream + * @return KStream + */ + <K1, V1> KStream<K1, V1> flatMap(KeyValueMapper<K, V, Iterable<KeyValue<K1, V1>>> mapper); + + /** + * Creates a new stream by applying a mapper to all values of this stream and using the values in the resulting Iterable + * + * @param processor the instance of Processor + * @param <V1> the value type of the new stream + * @return KStream + */ + <V1> KStream<K, V1> flatMapValues(ValueMapper<V, Iterable<V1>> processor); + + /** + * Creates a new windowed stream using a specified window instance. + * + * @param windowDef the instance of Window + * @return KStream + */ + KStreamWindowed<K, V> with(WindowDef<K, V> windowDef); + + /** + * Creates an array of streams from this stream. Each stream in the array coresponds to a predicate in + * supplied predicates in the same order. Predicates are evaluated in order. An element is streamed to + * a corresponding stream for the first predicate is evaluated true. + * An element will be dropped if none of the predicates evaluate true. + * + * @param predicates Instances of Predicate + * @return KStream + */ + KStream<K, V>[] branch(Predicate<K, V>... predicates); + + /** + * Sends key-value to a topic, also creates a new stream from the topic. + * This is equivalent to calling to(topic) and from(topic). + * + * @param topic the topic name + * @param <K1> the key type of the new stream + * @param <V1> the value type of the new stream + * @return KStream + */ + <K1, V1> KStream<K1, V1> through(String topic); + + /** + * Sends key-value to a topic, also creates a new stream from the topic. + * This is equivalent to calling to(topic) and from(topic). + * + * @param topic the topic name + * @param keySerializer key serializer used to send key-value pairs, + * if not specified the default serializer defined in the configs will be used + * @param valSerializer value serializer used to send key-value pairs, + * if not specified the default serializer defined in the configs will be used + * @param keyDeserializer key deserializer used to create the new KStream, + * if not specified the default deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to create the new KStream, + * if not specified the default deserializer defined in the configs will be used + * @param <K1> the key type of the new stream + * @param <V1> the value type of the new stream + * @return KStream + */ + <K1, V1> KStream<K1, V1> through(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, Deserializer<K1> keyDeserializer, Deserializer<V1> valDeserializer); + + /** + * Sends key-value to a topic using default serializers specified in the config. + * + * @param topic the topic name + */ + void to(String topic); + + /** + * Sends key-value to a topic. + * + * @param topic the topic name + * @param keySerializer key serializer used to send key-value pairs, + * if not specified the default serializer defined in the configs will be used + * @param valSerializer value serializer used to send key-value pairs, + * if not specified the default serializer defined in the configs will be used + */ + void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer); + + /** + * Processes all elements in this stream by applying a processor. + * + * @param processorDef the class of ProcessorDef + */ + <K1, V1> KStream<K1, V1> process(ProcessorDef processorDef); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java new file mode 100644 index 0000000..2d4dcc7 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamBuilder.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.streams.kstream.internals.KStreamImpl; +import org.apache.kafka.streams.processor.TopologyBuilder; + +/** + * KStreamBuilder is the class to create KStream instances. + */ +public class KStreamBuilder extends TopologyBuilder { + + public KStreamBuilder() { + super(); + } + + /** + * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group. + * The default deserializers specified in the config are used. + * + * @param topics the topic names, if empty default to all the topics in the config + * @return KStream + */ + public <K, V> KStream<K, V> from(String... topics) { + String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement(); + + addSource(name, topics); + + return new KStreamImpl<>(this, name); + } + + /** + * Creates a KStream instance for the specified topic. The stream is added to the default synchronization group. + * + * @param keyDeserializer key deserializer used to read this source KStream, + * if not specified the default deserializer defined in the configs will be used + * @param valDeserializer value deserializer used to read this source KStream, + * if not specified the default deserializer defined in the configs will be used + * @param topics the topic names, if empty default to all the topics in the config + * @return KStream + */ + public <K, V> KStream<K, V> from(Deserializer<? extends K> keyDeserializer, Deserializer<? extends V> valDeserializer, String... topics) { + String name = KStreamImpl.SOURCE_NAME + KStreamImpl.INDEX.getAndIncrement(); + + addSource(name, keyDeserializer, valDeserializer, topics); + + return new KStreamImpl<>(this, name); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java new file mode 100644 index 0000000..4d73128 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KStreamWindowed.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +/** + * KStreamWindowed is an abstraction of a stream of key-value pairs with a window. + */ +public interface KStreamWindowed<K, V> extends KStream<K, V> { + + /** + * Creates a new stream by joining this windowed stream with the other windowed stream. + * Each element arrived from either of the streams is joined with elements in a window of each other. + * The resulting values are computed by applying a joiner. + * + * @param other the other windowed stream + * @param joiner ValueJoiner + * @param <V1> the value type of the other stream + * @param <V2> the value type of the new stream + * @return KStream + */ + <V1, V2> KStream<K, V2> join(KStreamWindowed<K, V1> other, ValueJoiner<V, V1, V2> joiner); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java new file mode 100644 index 0000000..f633f6e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValue.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public class KeyValue<K, V> { + + public final K key; + public final V value; + + public KeyValue(K key, V value) { + this.key = key; + this.value = value; + } + + public static <K, V> KeyValue<K, V> pair(K key, V value) { + return new KeyValue<>(key, value); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java new file mode 100644 index 0000000..62b07f6 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueMapper.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface KeyValueMapper<K, V, R> { + + R apply(K key, V value); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java new file mode 100644 index 0000000..9cdb3bc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Predicate.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface Predicate<K, V> { + + boolean apply(K key, V value); + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java new file mode 100644 index 0000000..cc03541 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/SlidingWindowDef.java @@ -0,0 +1,265 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.streams.kstream.internals.FilteredIterator; +import org.apache.kafka.streams.kstream.internals.WindowSupport; +import org.apache.kafka.streams.processor.internals.ProcessorContextImpl; +import org.apache.kafka.streams.processor.internals.RecordCollector; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.RestoreFunc; +import org.apache.kafka.streams.processor.internals.Stamped; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.Map; + +public class SlidingWindowDef<K, V> implements WindowDef<K, V> { + private final String name; + private final long duration; + private final int maxCount; + private final Serializer<K> keySerializer; + private final Serializer<V> valueSerializer; + private final Deserializer<K> keyDeserializer; + private final Deserializer<V> valueDeserializer; + + public SlidingWindowDef( + String name, + long duration, + int maxCount, + Serializer<K> keySerializer, + Serializer<V> valueSerializer, + Deserializer<K> keyDeseriaizer, + Deserializer<V> valueDeserializer) { + this.name = name; + this.duration = duration; + this.maxCount = maxCount; + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.keyDeserializer = keyDeseriaizer; + this.valueDeserializer = valueDeserializer; + } + + @Override + public String name() { + return name; + } + + @Override + public Window<K, V> instance() { + return new SlidingWindow(); + } + + public class SlidingWindow extends WindowSupport implements Window<K, V> { + private final Object lock = new Object(); + private ProcessorContext context; + private int slotNum; // used as a key for Kafka log compaction + private LinkedList<K> list = new LinkedList<K>(); + private HashMap<K, ValueList<V>> map = new HashMap<>(); + + @Override + public void init(ProcessorContext context) { + this.context = context; + RestoreFuncImpl restoreFunc = new RestoreFuncImpl(); + context.register(this, restoreFunc); + + for (ValueList<V> valueList : map.values()) { + valueList.clearDirtyValues(); + } + this.slotNum = restoreFunc.slotNum; + } + + @Override + public Iterator<V> findAfter(K key, final long timestamp) { + return find(key, timestamp, timestamp + duration); + } + + @Override + public Iterator<V> findBefore(K key, final long timestamp) { + return find(key, timestamp - duration, timestamp); + } + + @Override + public Iterator<V> find(K key, final long timestamp) { + return find(key, timestamp - duration, timestamp + duration); + } + + /* + * finds items in the window between startTime and endTime (both inclusive) + */ + private Iterator<V> find(K key, final long startTime, final long endTime) { + final ValueList<V> values = map.get(key); + + if (values == null) { + return Collections.emptyIterator(); + } else { + return new FilteredIterator<V, Value<V>>(values.iterator()) { + @Override + protected V filter(Value<V> item) { + if (startTime <= item.timestamp && item.timestamp <= endTime) + return item.value; + else + return null; + } + }; + } + } + + @Override + public void put(K key, V value, long timestamp) { + synchronized (lock) { + slotNum++; + + list.offerLast(key); + + ValueList<V> values = map.get(key); + if (values == null) { + values = new ValueList<>(); + map.put(key, values); + } + + values.add(slotNum, value, timestamp); + } + evictExcess(); + evictExpired(timestamp - duration); + } + + private void evictExcess() { + while (list.size() > maxCount) { + K oldestKey = list.pollFirst(); + + ValueList<V> values = map.get(oldestKey); + values.removeFirst(); + + if (values.isEmpty()) map.remove(oldestKey); + } + } + + private void evictExpired(long cutoffTime) { + while (true) { + K oldestKey = list.peekFirst(); + + ValueList<V> values = map.get(oldestKey); + Stamped<V> oldestValue = values.first(); + + if (oldestValue.timestamp < cutoffTime) { + list.pollFirst(); + values.removeFirst(); + + if (values.isEmpty()) map.remove(oldestKey); + } else { + break; + } + } + } + + @Override + public String name() { + return name; + } + + @Override + public void flush() { + IntegerSerializer intSerializer = new IntegerSerializer(); + ByteArraySerializer byteArraySerializer = new ByteArraySerializer(); + + RecordCollector collector = ((ProcessorContextImpl) context).recordCollector(); + + for (Map.Entry<K, ValueList<V>> entry : map.entrySet()) { + ValueList<V> values = entry.getValue(); + if (values.hasDirtyValues()) { + K key = entry.getKey(); + + byte[] keyBytes = keySerializer.serialize(name, key); + + Iterator<Value<V>> iterator = values.dirtyValueIterator(); + while (iterator.hasNext()) { + Value<V> dirtyValue = iterator.next(); + byte[] slot = intSerializer.serialize("", dirtyValue.slotNum); + byte[] valBytes = valueSerializer.serialize(name, dirtyValue.value); + + byte[] combined = new byte[8 + 4 + keyBytes.length + 4 + valBytes.length]; + + int offset = 0; + offset += putLong(combined, offset, dirtyValue.timestamp); + offset += puts(combined, offset, keyBytes); + offset += puts(combined, offset, valBytes); + + if (offset != combined.length) + throw new IllegalStateException("serialized length does not match"); + + collector.send(new ProducerRecord<>(name, context.id(), slot, combined), byteArraySerializer, byteArraySerializer); + } + values.clearDirtyValues(); + } + } + } + + @Override + public void close() { + // TODO + } + + @Override + public boolean persistent() { + // TODO: should not be persistent, right? + return false; + } + + private class RestoreFuncImpl implements RestoreFunc { + + final IntegerDeserializer intDeserializer; + int slotNum = 0; + + RestoreFuncImpl() { + intDeserializer = new IntegerDeserializer(); + } + + @Override + public void apply(byte[] slot, byte[] bytes) { + + slotNum = intDeserializer.deserialize("", slot); + + int offset = 0; + // timestamp + long timestamp = getLong(bytes, offset); + offset += 8; + // key + int length = getInt(bytes, offset); + offset += 4; + K key = deserialize(bytes, offset, length, name, keyDeserializer); + offset += length; + // value + length = getInt(bytes, offset); + offset += 4; + V value = deserialize(bytes, offset, length, name, valueDeserializer); + + put(key, value, timestamp); + } + } + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java new file mode 100644 index 0000000..93fc359 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueJoiner.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface ValueJoiner<V1, V2, R> { + + R apply(V1 value1, V2 value2); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/263c10ab/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java new file mode 100644 index 0000000..a32423d --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/ValueMapper.java @@ -0,0 +1,23 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.kstream; + +public interface ValueMapper<V1, V2> { + + V2 apply(V1 value); +}
