KAFKA-3066: Demo Examples for Kafka Streams Author: Guozhang Wang <[email protected]>
Reviewers: Ewen Cheslack-Postava <[email protected]> Closes #797 from guozhangwang/K3066 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c197113a Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c197113a Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c197113a Branch: refs/heads/trunk Commit: c197113a9c04e2f6c2d1a72161c0d40d5804490e Parents: a19729f Author: Guozhang Wang <[email protected]> Authored: Fri Jan 22 15:25:24 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Fri Jan 22 15:25:24 2016 -0800 ---------------------------------------------------------------------- build.gradle | 25 ++++ settings.gradle | 2 +- .../examples/pageview/JsonPOJODeserializer.java | 66 ++++++++++ .../examples/pageview/JsonPOJOSerializer.java | 60 +++++++++ .../examples/pageview/PageViewTypedJob.java | 127 +++++++++++++++++++ .../examples/pageview/PageViewUnTypedJob.java | 107 ++++++++++++++++ .../kafka/streams/examples/pipe/PipeJob.java | 50 ++++++++ .../examples/wordcount/WordCountJob.java | 103 +++++++++++++++ .../wordcount/WordCountProcessorJob.java | 121 ++++++++++++++++++ .../org/apache/kafka/streams/StreamsConfig.java | 6 +- .../kafka/streams/examples/KStreamJob.java | 84 ------------ .../kafka/streams/examples/ProcessorJob.java | 115 ----------------- .../examples/WallclockTimestampExtractor.java | 28 ---- .../org/apache/kafka/streams/kstream/Count.java | 6 +- .../streams/kstream/KeyValueToDoubleMapper.java | 23 ---- .../streams/kstream/KeyValueToIntMapper.java | 23 ---- .../streams/kstream/KeyValueToLongMapper.java | 23 ---- .../kafka/streams/kstream/TumblingWindows.java | 8 +- .../kafka/streams/kstream/UnlimitedWindows.java | 10 +- .../streams/kstream/internals/KStreamImpl.java | 8 +- .../internals/WindowedStreamPartitioner.java | 52 ++++++++ .../internals/WindowedStreamsPartitioner.java | 52 -------- .../streams/processor/StreamPartitioner.java | 59 +++++++++ .../streams/processor/StreamsPartitioner.java | 59 --------- .../streams/processor/TopologyBuilder.java | 26 ++-- .../processor/internals/AbstractTask.java | 3 +- .../processor/internals/RecordCollector.java | 4 +- .../streams/processor/internals/SinkNode.java | 6 +- .../processor/internals/StreamThread.java | 20 ++- .../internals/WallclockTimestampExtractor.java | 28 ++++ .../apache/kafka/streams/StreamsConfigTest.java | 2 +- .../WindowedStreamPartitionerTest.java | 84 ++++++++++++ .../WindowedStreamsPartitionerTest.java | 84 ------------ .../internals/ProcessorTopologyTest.java | 6 +- .../processor/internals/StandbyTaskTest.java | 6 +- .../processor/internals/StreamThreadTest.java | 32 +++-- .../streams/state/KeyValueStoreTestDriver.java | 4 +- .../apache/kafka/test/KStreamTestDriver.java | 4 +- 38 files changed, 972 insertions(+), 554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 150cac7..0b1dcc4 100644 --- a/build.gradle +++ b/build.gradle @@ -514,6 +514,7 @@ project(':streams') { dependencies { compile project(':clients') + compile project(':connect:json') // this dependency should be removed after we unify data API compile libs.slf4jlog4j compile libs.rocksDBJni compile libs.zkclient // this dependency should be removed after KIP-4 @@ -542,6 +543,30 @@ project(':streams') { } } +project(':streams:examples') { + archivesBaseName = "kafka-streams-examples" + + dependencies { + compile project(':streams') + compile project(':connect:json') // this dependency should be removed after we unify data API + } + + javadoc { + enabled = false + } + + tasks.create(name: "copyDependantLibs", type: Copy) { + from (configurations.runtime) { + exclude('kafka-streams*') + } + into "$buildDir/dependant-libs-${versions.scala}" + } + + jar { + dependsOn 'copyDependantLibs' + } +} + project(':log4j-appender') { archivesBaseName = "kafka-log4j-appender" http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/settings.gradle ---------------------------------------------------------------------- diff --git a/settings.gradle b/settings.gradle index 097c43b..d430c2f 100644 --- a/settings.gradle +++ b/settings.gradle @@ -13,5 +13,5 @@ // See the License for the specific language governing permissions and // limitations under the License. -include 'core', 'examples', 'clients', 'tools', 'streams', 'log4j-appender', +include 'core', 'examples', 'clients', 'tools', 'streams', 'streams:examples', 'log4j-appender', 'connect:api', 'connect:runtime', 'connect:json', 'connect:file' http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java new file mode 100644 index 0000000..583ec2d --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java @@ -0,0 +1,66 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +/** + * JSON deserializer for Jackson's JsonNode tree model. Using the tree model allows it to work with arbitrarily + * structured data without having associated Java classes. This deserializer also supports Connect schemas. + */ +public class JsonPOJODeserializer<T> implements Deserializer<T> { + private ObjectMapper objectMapper = new ObjectMapper(); + + private Class<T> tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJODeserializer() { + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> props, boolean isKey) { + tClass = (Class<T>) props.get("JsonPOJOClass"); + } + + @Override + public T deserialize(String topic, byte[] bytes) { + if (bytes == null) + return null; + + T data; + try { + data = objectMapper.readValue(bytes, tClass); + } catch (Exception e) { + throw new SerializationException(e); + } + + return data; + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java new file mode 100644 index 0000000..bb60327 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.streams.examples.pageview; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class JsonPOJOSerializer<T> implements Serializer<T> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + private Class<T> tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJOSerializer() { + + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> props, boolean isKey) { + tClass = (Class<T>) props.get("JsonPOJOClass"); + } + + @Override + public byte[] serialize(String topic, T data) { + if (data == null) + return null; + + try { + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java new file mode 100644 index 0000000..c064848 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java @@ -0,0 +1,127 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Properties; + +public class PageViewTypedJob { + + // POJO classes + static public class PageView { + public String user; + public String page; + } + + static public class UserProfile { + public String user; + public String region; + } + + static public class PageViewByRegion { + public String user; + public String page; + public String region; + } + + static public class WindowedPageViewByRegion { + public long windowStart; + public String region; + } + + static public class RegionCount { + public long count; + public String region; + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + final Deserializer<Long> longDeserializer = new LongDeserializer(); + + + KStream<String, PageView> views = builder.stream("streams-pageview-input"); + + KStream<String, PageView> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.user, record)); + + KTable<String, UserProfile> users = builder.table("streams-userprofile-input"); + + KStream<WindowedPageViewByRegion, RegionCount> regionCount = viewsByUser + .leftJoin(users, (view, profile) -> { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + viewByRegion.region = profile.region; + + return viewByRegion; + }) + .map((user, viewRegion) -> new KeyValue<>(viewRegion.region, viewRegion)) + .aggregateByKey(new Count<String, PageViewByRegion>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { + @Override + public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { + WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); + wViewByRegion.windowStart = key.window().start(); + wViewByRegion.region = key.value(); + + RegionCount rCount = new RegionCount(); + rCount.region = key.value(); + rCount.count = value; + + return new KeyValue<>(wViewByRegion, rCount); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-output", new JsonPOJOSerializer<>(), new JsonPOJOSerializer<>()); + + KafkaStreams kstream = new KafkaStreams(builder, props); + kstream.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java new file mode 100644 index 0000000..1ae02c9 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Properties; + +public class PageViewUnTypedJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + + StreamsConfig config = new StreamsConfig(props); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + final Deserializer<Long> longDeserializer = new LongDeserializer(); + + + KStream<String, JsonNode> views = builder.stream("streams-pageview-input"); + + KStream<String, JsonNode> viewsByUser = views.map((dummy, record) -> new KeyValue<>(record.get("user").textValue(), record)); + + KTable<String, JsonNode> users = builder.table("streams-userprofile-input"); + + KTable<String, String> userRegions = users.mapValues(record -> record.get("region").textValue()); + + KStream<JsonNode, JsonNode> regionCount = viewsByUser + .leftJoin(userRegions, (view, region) -> { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + return (JsonNode) jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region); + }) + .map((user, viewRegion) -> new KeyValue<>(viewRegion.get("region").textValue(), viewRegion)) + .aggregateByKey(new Count<String, JsonNode>(), HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { + @Override + public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { + ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("window-start", key.window().start()) + .put("region", key.window().start()); + + ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("count", value); + + return new KeyValue<JsonNode, JsonNode>((JsonNode) keyNode, (JsonNode) valueNode); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-output"); + + KafkaStreams kstream = new KafkaStreams(builder, config); + kstream.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java new file mode 100644 index 0000000..4a4f97f --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pipe; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.Properties; + +public class PipeJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // can specify underlying client configs if necessary + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + builder.stream("streams-file-input").to("streams-pipe-output"); + + KafkaStreams kstream = new KafkaStreams(builder, props); + kstream.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java new file mode 100644 index 0000000..8aa15a4 --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.Count; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.UnlimitedWindows; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Arrays; +import java.util.Properties; + +public class WordCountJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // can specify underlying client configs if necessary + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + final Deserializer<Long> longDeserializer = new LongDeserializer(); + final Serializer<JsonNode> JsonSerializer = new JsonSerializer(); + + KStream<String, String> source = builder.stream("streams-file-input"); + + KStream<String, JsonNode> counts = source + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase().split(" ")); + } + }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { + @Override + public KeyValue<String, String> apply(String key, String value) { + return new KeyValue<String, String>(value, value); + } + }) + .aggregateByKey(new Count<>(), UnlimitedWindows.of("Counts").startOn(0L), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<String, JsonNode>>() { + @Override + public KeyValue<String, JsonNode> apply(Windowed<String> key, Long value) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + jNode.put("word", key.value()) + .put("count", value); + + return new KeyValue<String, JsonNode>(null, jNode); + } + }); + + counts.to("streams-wordcount-output", stringSerializer, JsonSerializer); + + KafkaStreams kstream = new KafkaStreams(builder, props); + kstream.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java new file mode 100644 index 0000000..63692bd --- /dev/null +++ b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +import java.util.Properties; + +public class WordCountProcessorJob { + + private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { + + @Override + public Processor<String, String> get() { + return new Processor<String, String>() { + private ProcessorContext context; + private KeyValueStore<String, Integer> kvStore; + + @Override + @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + this.context = context; + this.context.schedule(1000); + this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); + } + + @Override + public void process(String dummy, String line) { + String words[] = line.toLowerCase().split(" "); + + for (String word : words) { + Integer oldValue = this.kvStore.get(word); + + if (oldValue == null) { + this.kvStore.put(word, 1); + } else { + this.kvStore.put(word, oldValue + 1); + } + } + + context.commit(); + } + + @Override + public void punctuate(long timestamp) { + KeyValueIterator<String, Integer> iter = this.kvStore.all(); + + System.out.println("----------- " + timestamp + "----------- "); + + while (iter.hasNext()) { + KeyValue<String, Integer> entry = iter.next(); + + System.out.println("[" + entry.key + ", " + entry.value + "]"); + + context.forward(entry.key, entry.value.toString()); + } + + iter.close(); + } + + @Override + public void close() { + this.kvStore.close(); + } + }; + } + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // can specify underlying client configs if necessary + props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("Source", "streams-file-input"); + + builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); + builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); + + builder.addSink("Sink", "streams-wordcount-output", "Process"); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 3843b1d..16bb06a 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -113,7 +113,7 @@ public class StreamsConfig extends AbstractConfig { /** <code>client.id</code> */ public static final String CLIENT_ID_CONFIG = CommonClientConfigs.CLIENT_ID_CONFIG; - private static final String SYSTEM_TEMP_DIRECTORY = System.getProperty("java.io.tmpdir"); + private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor"; static { CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value @@ -136,8 +136,8 @@ public class StreamsConfig extends AbstractConfig { StreamsConfig.ZOOKEEPER_CONNECT_DOC) .define(STATE_DIR_CONFIG, Type.STRING, - SYSTEM_TEMP_DIRECTORY, - Importance.HIGH, + "/tmp/kafka-streams", + Importance.MEDIUM, STATE_DIR_DOC) .define(KEY_SERIALIZER_CLASS_CONFIG, // required with no default value Type.CLASS, http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java deleted file mode 100644 index a234395..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/examples/KStreamJob.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples; - -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.Predicate; - -import java.util.Properties; - -public class KStreamJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "example-kstream"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamsConfig config = new StreamsConfig(props); - - KStreamBuilder builder = new KStreamBuilder(); - - KStream<String, String> stream1 = builder.stream("topic1"); - - KStream<String, Integer> stream2 = - stream1.map(new KeyValueMapper<String, String, KeyValue<String, Integer>>() { - @Override - public KeyValue<String, Integer> apply(String key, String value) { - return new KeyValue<>(key, new Integer(value)); - } - }).filter(new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return true; - } - }); - - KStream<String, Integer>[] streams = stream2.branch( - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return (value % 2) == 0; - } - }, - new Predicate<String, Integer>() { - @Override - public boolean test(String key, Integer value) { - return true; - } - } - ); - - streams[0].to("topic2"); - streams[1].to("topic3"); - - KafkaStreams kstream = new KafkaStreams(builder, config); - kstream.start(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java b/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java deleted file mode 100644 index e17c16b..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/examples/ProcessorJob.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples; - -import org.apache.kafka.common.serialization.IntegerDeserializer; -import org.apache.kafka.common.serialization.IntegerSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; - -import java.util.Properties; - -public class ProcessorJob { - - private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { - - @Override - public Processor<String, String> get() { - return new Processor<String, String>() { - private ProcessorContext context; - private KeyValueStore<String, Integer> kvStore; - - @Override - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - this.context = context; - this.context.schedule(1000); - this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("LOCAL-STATE"); - } - - @Override - public void process(String key, String value) { - Integer oldValue = this.kvStore.get(key); - Integer newValue = Integer.parseInt(value); - if (oldValue == null) { - this.kvStore.put(key, newValue); - } else { - this.kvStore.put(key, oldValue + newValue); - } - - context.commit(); - } - - @Override - public void punctuate(long timestamp) { - KeyValueIterator<String, Integer> iter = this.kvStore.all(); - - while (iter.hasNext()) { - KeyValue<String, Integer> entry = iter.next(); - - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value); - } - - iter.close(); - } - - @Override - public void close() { - this.kvStore.close(); - } - }; - } - } - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "example-processor"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class); - StreamsConfig config = new StreamsConfig(props); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.addSource("SOURCE", new StringDeserializer(), new StringDeserializer(), "topic-source"); - - builder.addProcessor("PROCESS", new MyProcessorSupplier(), "SOURCE"); - builder.addStateStore(Stores.create("LOCAL-STATE").withStringKeys().withIntegerValues().inMemory().build(), "PROCESS"); - - builder.addSink("SINK", "topic-sink", new StringSerializer(), new IntegerSerializer(), "PROCESS"); - - KafkaStreams streams = new KafkaStreams(builder, config); - streams.start(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java deleted file mode 100644 index 26281d6..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/examples/WallclockTimestampExtractor.java +++ /dev/null @@ -1,28 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples; - -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.TimestampExtractor; - -public class WallclockTimestampExtractor implements TimestampExtractor { - @Override - public long extract(ConsumerRecord<Object, Object> record) { - return System.currentTimeMillis(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java index 3c1ed46..8780cc7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Count.java @@ -17,7 +17,7 @@ package org.apache.kafka.streams.kstream; -public class Count<K> implements Aggregator<K, Long, Long> { +public class Count<K, V> implements Aggregator<K, V, Long> { @Override public Long initialValue(K aggKey) { @@ -25,12 +25,12 @@ public class Count<K> implements Aggregator<K, Long, Long> { } @Override - public Long add(K aggKey, Long value, Long aggregate) { + public Long add(K aggKey, V value, Long aggregate) { return aggregate + 1L; } @Override - public Long remove(K aggKey, Long value, Long aggregate) { + public Long remove(K aggKey, V value, Long aggregate) { return aggregate - 1L; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java deleted file mode 100644 index ae3b858..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToDoubleMapper.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface KeyValueToDoubleMapper<K, V> { - - double apply(K key, V value); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java deleted file mode 100644 index 72e5ee9..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToIntMapper.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface KeyValueToIntMapper<K, V> { - - int apply(K key, V value); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java b/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java deleted file mode 100644 index 3a8d8a8..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/KeyValueToLongMapper.java +++ /dev/null @@ -1,23 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.kstream; - -public interface KeyValueToLongMapper<K, V> { - - long apply(K key, V value); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java index 02ece3a..188fe66 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/TumblingWindows.java @@ -20,7 +20,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.TumblingWindow; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class TumblingWindows extends Windows<TumblingWindow> { @@ -53,7 +53,11 @@ public class TumblingWindows extends Windows<TumblingWindow> { public Map<Long, TumblingWindow> windowsFor(long timestamp) { long windowStart = timestamp - timestamp % size; - return Collections.singletonMap(windowStart, new TumblingWindow(windowStart, windowStart + size)); + // we cannot use Collections.singleMap since it does not support remove() call + Map<Long, TumblingWindow> windows = new HashMap<>(); + windows.put(windowStart, new TumblingWindow(windowStart, windowStart + size)); + + return windows; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java index 6f47253..06882b3 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/UnlimitedWindows.java @@ -19,7 +19,7 @@ package org.apache.kafka.streams.kstream; import org.apache.kafka.streams.kstream.internals.UnlimitedWindow; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; public class UnlimitedWindows extends Windows<UnlimitedWindow> { @@ -48,7 +48,13 @@ public class UnlimitedWindows extends Windows<UnlimitedWindow> { @Override public Map<Long, UnlimitedWindow> windowsFor(long timestamp) { // always return the single unlimited window - return Collections.singletonMap(start, new UnlimitedWindow(start)); + + // we cannot use Collections.singleMap since it does not support remove() call + Map<Long, UnlimitedWindow> windows = new HashMap<>(); + windows.put(start, new UnlimitedWindow(start)); + + + return windows; } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java index 98e50c3..7ebc28c 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImpl.java @@ -36,7 +36,7 @@ import org.apache.kafka.streams.kstream.Window; import org.apache.kafka.streams.kstream.Windowed; import org.apache.kafka.streams.kstream.Windows; import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.apache.kafka.streams.state.internals.RocksDBWindowStoreSupplier; import org.apache.kafka.streams.state.Serdes; @@ -217,14 +217,14 @@ public class KStreamImpl<K, V> extends AbstractStream<K> implements KStream<K, V @Override public void to(String topic, Serializer<K> keySerializer, Serializer<V> valSerializer) { String name = topology.newName(SINK_NAME); - StreamsPartitioner<K, V> streamsPartitioner = null; + StreamPartitioner<K, V> streamPartitioner = null; if (keySerializer != null && keySerializer instanceof WindowedSerializer) { WindowedSerializer<Object> windowedSerializer = (WindowedSerializer<Object>) keySerializer; - streamsPartitioner = (StreamsPartitioner<K, V>) new WindowedStreamsPartitioner<Object, V>(windowedSerializer); + streamPartitioner = (StreamPartitioner<K, V>) new WindowedStreamPartitioner<Object, V>(windowedSerializer); } - topology.addSink(name, topic, keySerializer, valSerializer, streamsPartitioner, this.name); + topology.addSink(name, topic, keySerializer, valSerializer, streamPartitioner, this.name); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java new file mode 100644 index 0000000..10e69cc --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamPartitioner.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.kstream.internals; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.processor.StreamPartitioner; + +public class WindowedStreamPartitioner<K, V> implements StreamPartitioner<Windowed<K>, V> { + + private final WindowedSerializer<K> serializer; + + public WindowedStreamPartitioner(WindowedSerializer<K> serializer) { + this.serializer = serializer; + } + + /** + * WindowedStreamPartitioner determines the partition number for a message with the given windowed key and value + * and the current number of partitions. The partition number id determined by the original key of the windowed key + * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. + * + * @param windowedKey the key of the message + * @param value the value of the message + * @param numPartitions the total number of partitions + * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used + */ + public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) { + byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); + + // hash the keyBytes to choose a partition + return toPositive(Utils.murmur2(keyBytes)) % numPartitions; + } + + private static int toPositive(int number) { + return number & 0x7fffffff; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java deleted file mode 100644 index ff1fa2c..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/WindowedStreamsPartitioner.java +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.kstream.internals; - -import org.apache.kafka.common.utils.Utils; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.processor.StreamsPartitioner; - -public class WindowedStreamsPartitioner<K, V> implements StreamsPartitioner<Windowed<K>, V> { - - private final WindowedSerializer<K> serializer; - - public WindowedStreamsPartitioner(WindowedSerializer<K> serializer) { - this.serializer = serializer; - } - - /** - * WindowedStreamsPartitioner determines the partition number for a message with the given windowed key and value - * and the current number of partitions. The partition number id determined by the original key of the windowed key - * using the same logic as DefaultPartitioner so that the topic is partitioned by the original key. - * - * @param windowedKey the key of the message - * @param value the value of the message - * @param numPartitions the total number of partitions - * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used - */ - public Integer partition(Windowed<K> windowedKey, V value, int numPartitions) { - byte[] keyBytes = serializer.serializeBaseKey(null, windowedKey); - - // hash the keyBytes to choose a partition - return toPositive(Utils.murmur2(keyBytes)) % numPartitions; - } - - private static int toPositive(int number) { - return number & 0x7fffffff; - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java new file mode 100644 index 0000000..f14d9d9 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/StreamPartitioner.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +/** + * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's + * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition. + * <p> + * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so + * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you + * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics. + * <p> + * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only + * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will + * automatically manage these instances, and adjust when new topology instances are added or removed. + * <p> + * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have + * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance. + * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently + * determine to which partition each message should be written. + * <p> + * To do this, create a <code>StreamPartitioner</code> implementation, and when you build your topology specify that custom partitioner + * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) adding a sink} + * for that topic. + * <p> + * All StreamPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes. + * + * @param <K> the type of keys + * @param <V> the type of values + * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, + * org.apache.kafka.common.serialization.Serializer, StreamPartitioner, String...) + * @see TopologyBuilder#addSink(String, String, StreamPartitioner, String...) + */ +public interface StreamPartitioner<K, V> { + + /** + * Determine the partition number for a message with the given key and value and the current number of partitions. + * + * @param key the key of the message + * @param value the value of the message + * @param numPartitions the total number of partitions + * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used + */ + Integer partition(K key, V value, int numPartitions); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java b/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java deleted file mode 100644 index f8d199d..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/StreamsPartitioner.java +++ /dev/null @@ -1,59 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.kafka.streams.processor; - -/** - * Determine how messages are distributed among the partitions in a Kafka topic. If not specified, the underlying producer's - * {@link org.apache.kafka.clients.producer.internals.DefaultPartitioner} will be used to determine the partition. - * <p> - * Kafka topics are divided into one or more <i>partitions</i>. Since each partition must fit on the servers that host it, so - * using multiple partitions allows the topic to scale beyond a size that will fit on a single machine. Partitions also enable you - * to use multiple instances of your topology to process in parallel all of the messages on the topology's source topics. - * <p> - * When a topology is instantiated, each of its sources are assigned a subset of that topic's partitions. That means that only - * those processors in that topology instance will consume the messages from those partitions. In many cases, Kafka Streams will - * automatically manage these instances, and adjust when new topology instances are added or removed. - * <p> - * Some topologies, though, need more control over which messages appear in each partition. For example, some topologies that have - * stateful processors may want all messages within a range of keys to always be delivered to and handled by the same topology instance. - * An upstream topology producing messages to that topic can use a custom <i>stream partitioner</i> to precisely and consistently - * determine to which partition each message should be written. - * <p> - * To do this, create a <code>StreamsPartitioner</code> implementation, and when you build your topology specify that custom partitioner - * when {@link TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) adding a sink} - * for that topic. - * <p> - * All StreamsPartitioner implementations should be stateless and a pure function so they can be shared across topic and sink nodes. - * - * @param <K> the type of keys - * @param <V> the type of values - * @see TopologyBuilder#addSink(String, String, org.apache.kafka.common.serialization.Serializer, - * org.apache.kafka.common.serialization.Serializer, StreamsPartitioner, String...) - * @see TopologyBuilder#addSink(String, String, StreamsPartitioner, String...) - */ -public interface StreamsPartitioner<K, V> { - - /** - * Determine the partition number for a message with the given key and value and the current number of partitions. - * - * @param key the key of the message - * @param value the value of the message - * @param numPartitions the total number of partitions - * @return an integer between 0 and {@code numPartitions-1}, or {@code null} if the default partitioning logic should be used - */ - Integer partition(K key, V value, int numPartitions); -} http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index f4e6821..a6b54b7 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -135,9 +135,9 @@ public class TopologyBuilder { public final String topic; private Serializer keySerializer; private Serializer valSerializer; - private final StreamsPartitioner partitioner; + private final StreamPartitioner partitioner; - private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamsPartitioner partitioner) { + private SinkNodeFactory(String name, String[] parents, String topic, Serializer keySerializer, Serializer valSerializer, StreamPartitioner partitioner) { super(name); this.parents = parents.clone(); this.topic = topic; @@ -245,9 +245,9 @@ public class TopologyBuilder { * @param parentNames the name of one or more source or processor nodes whose output message this sink should consume * and write to its topic * @return this builder instance so methods can be chained together; never null - * @see #addSink(String, String, StreamsPartitioner, String...) + * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public final TopologyBuilder addSink(String name, String topic, String... parentNames) { return addSink(name, topic, (Serializer) null, (Serializer) null, parentNames); @@ -260,7 +260,7 @@ public class TopologyBuilder { * {@link org.apache.kafka.streams.StreamsConfig#VALUE_SERIALIZER_CLASS_CONFIG default value serializer} specified in the * {@link org.apache.kafka.streams.StreamsConfig stream configuration}. * <p> - * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among + * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among * the named Kafka topic's partitions. Such control is often useful with topologies that use * {@link #addStateStore(StateStoreSupplier, String...) state stores} * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute @@ -274,9 +274,9 @@ public class TopologyBuilder { * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) * @see #addSink(String, String, Serializer, Serializer, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ - public final TopologyBuilder addSink(String name, String topic, StreamsPartitioner partitioner, String... parentNames) { + public final TopologyBuilder addSink(String name, String topic, StreamPartitioner partitioner, String... parentNames) { return addSink(name, topic, (Serializer) null, (Serializer) null, partitioner, parentNames); } @@ -284,7 +284,7 @@ public class TopologyBuilder { * Add a new sink that forwards messages from upstream parent processor and/or source nodes to the named Kafka topic. * The sink will use the specified key and value serializers. * <p> - * The sink will also use the specified {@link StreamsPartitioner} to determine how messages are distributed among + * The sink will also use the specified {@link StreamPartitioner} to determine how messages are distributed among * the named Kafka topic's partitions. Such control is often useful with topologies that use * {@link #addStateStore(StateStoreSupplier, String...) state stores} * in its processors. In most other cases, however, a partitioner need not be specified and Kafka will automatically distribute @@ -302,11 +302,11 @@ public class TopologyBuilder { * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamsPartitioner, String...) - * @see #addSink(String, String, Serializer, Serializer, StreamsPartitioner, String...) + * @see #addSink(String, String, StreamPartitioner, String...) + * @see #addSink(String, String, Serializer, Serializer, StreamPartitioner, String...) */ public final TopologyBuilder addSink(String name, String topic, Serializer keySerializer, Serializer valSerializer, String... parentNames) { - return addSink(name, topic, keySerializer, valSerializer, (StreamsPartitioner) null, parentNames); + return addSink(name, topic, keySerializer, valSerializer, (StreamPartitioner) null, parentNames); } /** @@ -326,10 +326,10 @@ public class TopologyBuilder { * and write to its topic * @return this builder instance so methods can be chained together; never null * @see #addSink(String, String, String...) - * @see #addSink(String, String, StreamsPartitioner, String...) + * @see #addSink(String, String, StreamPartitioner, String...) * @see #addSink(String, String, Serializer, Serializer, String...) */ - public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner, String... parentNames) { + public final <K, V> TopologyBuilder addSink(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner, String... parentNames) { if (nodeFactories.containsKey(name)) throw new TopologyException("Processor " + name + " is already added."); http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java index ef4c3c7..68680ab 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/AbstractTask.java @@ -57,7 +57,8 @@ public abstract class AbstractTask { // create the processor state manager try { - File stateFile = new File(config.getString(StreamsConfig.STATE_DIR_CONFIG), id.toString()); + File jobStateDir = StreamThread.makeStateDir(jobId, config.getString(StreamsConfig.STATE_DIR_CONFIG)); + File stateFile = new File(jobStateDir.getCanonicalPath(), id.toString()); // if partitions is null, this is a standby task this.stateMgr = new ProcessorStateManager(jobId, id.partition, partitions, stateFile, restoreConsumer, isStandby); } catch (IOException e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java index 25c663d..fe0472e 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollector.java @@ -24,7 +24,7 @@ import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -72,7 +72,7 @@ public class RecordCollector { } public <K, V> void send(ProducerRecord<K, V> record, Serializer<K> keySerializer, Serializer<V> valueSerializer, - StreamsPartitioner<K, V> partitioner) { + StreamPartitioner<K, V> partitioner) { byte[] keyBytes = keySerializer.serialize(record.topic(), record.key()); byte[] valBytes = valueSerializer.serialize(record.topic(), record.value()); Integer partition = null; http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 88b3f56..7ab59ee 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -20,18 +20,18 @@ package org.apache.kafka.streams.processor.internals; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.StreamsPartitioner; +import org.apache.kafka.streams.processor.StreamPartitioner; public class SinkNode<K, V> extends ProcessorNode<K, V> { private final String topic; private Serializer<K> keySerializer; private Serializer<V> valSerializer; - private final StreamsPartitioner<K, V> partitioner; + private final StreamPartitioner<K, V> partitioner; private ProcessorContext context; - public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamsPartitioner<K, V> partitioner) { + public SinkNode(String name, String topic, Serializer<K> keySerializer, Serializer<V> valSerializer, StreamPartitioner<K, V> partitioner) { super(name); this.topic = topic; http://git-wip-us.apache.org/repos/asf/kafka/blob/c197113a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index e5d0922..f118f60 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -104,6 +104,18 @@ public class StreamThread extends Thread { private final Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> standbyRecords; private boolean processStandbyRecords = false; + static File makeStateDir(String jobId, String baseDirName) { + File baseDir = new File(baseDirName); + if (!baseDir.exists()) + baseDir.mkdir(); + + File stateDir = new File(baseDir, jobId); + if (!stateDir.exists()) + stateDir.mkdir(); + + return stateDir; + } + final ConsumerRebalanceListener rebalanceListener = new ConsumerRebalanceListener() { @Override public void onPartitionsAssigned(Collection<TopicPartition> assignment) { @@ -167,8 +179,7 @@ public class StreamThread extends Thread { this.standbyRecords = new HashMap<>(); // read in task specific config values - this.stateDir = new File(this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); - this.stateDir.mkdir(); + this.stateDir = makeStateDir(this.jobId, this.config.getString(StreamsConfig.STATE_DIR_CONFIG)); this.pollTimeMs = config.getLong(StreamsConfig.POLL_MS_CONFIG); this.commitTimeMs = config.getLong(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG); this.cleanTimeMs = config.getLong(StreamsConfig.STATE_CLEANUP_DELAY_MS_CONFIG); @@ -452,14 +463,15 @@ public class StreamThread extends Thread { if (stateDirs != null) { for (File dir : stateDirs) { try { - TaskId id = TaskId.parse(dir.getName()); + String dirName = dir.getName(); + TaskId id = TaskId.parse(dirName.substring(dirName.lastIndexOf("-") + 1)); // try to acquire the exclusive lock on the state directory FileLock directoryLock = null; try { directoryLock = ProcessorStateManager.lockStateDirectory(dir); if (directoryLock != null) { - log.info("Deleting obsolete state directory {} after delayed {} ms.", dir.getAbsolutePath(), cleanTimeMs); + log.info("Deleting obsolete state directory {} for task {} after delayed {} ms.", dir.getAbsolutePath(), id, cleanTimeMs); Utils.delete(dir); } } catch (IOException e) {
