Repository: kafka Updated Branches: refs/heads/trunk 79662cc7c -> edeb11bc5
MINOR: Move streams-examples source files under src folder Also remove some unused imports. Author: Guozhang Wang <[email protected]> Reviewers: Ismael Juma <[email protected]>, Ewen Cheslack-Postava <[email protected]> Closes #992 from guozhangwang/KSExamples Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/edeb11bc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/edeb11bc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/edeb11bc Branch: refs/heads/trunk Commit: edeb11bc561c0721855b34a4506dc7f1c76445c6 Parents: 79662cc Author: Guozhang Wang <[email protected]> Authored: Tue Mar 1 18:53:58 2016 -0800 Committer: Ewen Cheslack-Postava <[email protected]> Committed: Tue Mar 1 18:53:58 2016 -0800 ---------------------------------------------------------------------- bin/kafka-run-class.sh | 5 + build.gradle | 3 +- checkstyle/import-control.xml | 5 + .../examples/pageview/JsonPOJODeserializer.java | 61 ------ .../examples/pageview/JsonPOJOSerializer.java | 60 ------ .../pageview/JsonTimestampExtractor.java | 46 ----- .../examples/pageview/PageViewTypedJob.java | 186 ------------------- .../examples/pageview/PageViewUnTypedJob.java | 142 -------------- .../kafka/streams/examples/pipe/PipeJob.java | 66 ------- .../examples/wordcount/WordCountJob.java | 99 ---------- .../wordcount/WordCountProcessorJob.java | 138 -------------- .../examples/pageview/JsonPOJODeserializer.java | 61 ++++++ .../examples/pageview/JsonPOJOSerializer.java | 60 ++++++ .../pageview/JsonTimestampExtractor.java | 46 +++++ .../examples/pageview/PageViewTypedJob.java | 185 ++++++++++++++++++ .../examples/pageview/PageViewUntypedJob.java | 141 ++++++++++++++ .../kafka/streams/examples/pipe/PipeJob.java | 65 +++++++ .../examples/wordcount/WordCountJob.java | 98 ++++++++++ .../wordcount/WordCountProcessorJob.java | 137 ++++++++++++++ 19 files changed, 805 insertions(+), 799 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/bin/kafka-run-class.sh ---------------------------------------------------------------------- diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh index 344e47f..f45d8d4 100755 --- a/bin/kafka-run-class.sh +++ b/bin/kafka-run-class.sh @@ -52,6 +52,11 @@ do CLASSPATH=$CLASSPATH:$file done +for file in $base_dir/streams/examples/build/libs/kafka-streams-examples*.jar; +do + CLASSPATH=$CLASSPATH:$file +done + for file in $base_dir/streams/build/dependant-libs-${SCALA_VERSION}/rocksdb*.jar; do CLASSPATH=$CLASSPATH:$file http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 5953f57..daf468f 100644 --- a/build.gradle +++ b/build.gradle @@ -259,7 +259,7 @@ for ( sv in ['2_10', '2_11'] ) { } def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file'] -def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams'] + connectPkgs +def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {} tasks.create(name: "jarAll", dependsOn: ['jar_core_2_10', 'jar_core_2_11'] + pkgs.collect { it + ":jar" }) { } @@ -374,6 +374,7 @@ project(':core') { from(project(':connect:file').jar) { into("libs/") } from(project(':connect:file').configurations.runtime) { into("libs/") } from(project(':streams').jar) { into("libs/") } + from(project(':streams:examples').jar) { into("libs/") } } jar { http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index b183b3d..051c8d1 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -127,6 +127,11 @@ <allow pkg="org.apache.kafka.streams"/> + <subpackage name="examples"> + <allow pkg="com.fasterxml.jackson.databind" /> + <allow pkg="org.apache.kafka.connect.json" /> + </subpackage> + <subpackage name="state"> <allow pkg="org.rocksdb" /> </subpackage> http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java deleted file mode 100644 index 5fcd1f3..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.streams.examples.pageview; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Deserializer; - -import java.util.Map; - -public class JsonPOJODeserializer<T> implements Deserializer<T> { - private ObjectMapper objectMapper = new ObjectMapper(); - - private Class<T> tClass; - - /** - * Default constructor needed by Kafka - */ - public JsonPOJODeserializer() { - } - - @SuppressWarnings("unchecked") - @Override - public void configure(Map<String, ?> props, boolean isKey) { - tClass = (Class<T>) props.get("JsonPOJOClass"); - } - - @Override - public T deserialize(String topic, byte[] bytes) { - if (bytes == null) - return null; - - T data; - try { - data = objectMapper.readValue(bytes, tClass); - } catch (Exception e) { - throw new SerializationException(e); - } - - return data; - } - - @Override - public void close() { - - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java deleted file mode 100644 index bb60327..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * <p/> - * http://www.apache.org/licenses/LICENSE-2.0 - * <p/> - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ -package org.apache.kafka.streams.examples.pageview; - - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.kafka.common.errors.SerializationException; -import org.apache.kafka.common.serialization.Serializer; - -import java.util.Map; - -public class JsonPOJOSerializer<T> implements Serializer<T> { - private final ObjectMapper objectMapper = new ObjectMapper(); - - private Class<T> tClass; - - /** - * Default constructor needed by Kafka - */ - public JsonPOJOSerializer() { - - } - - @SuppressWarnings("unchecked") - @Override - public void configure(Map<String, ?> props, boolean isKey) { - tClass = (Class<T>) props.get("JsonPOJOClass"); - } - - @Override - public byte[] serialize(String topic, T data) { - if (data == null) - return null; - - try { - return objectMapper.writeValueAsBytes(data); - } catch (Exception e) { - throw new SerializationException("Error serializing JSON message", e); - } - } - - @Override - public void close() { - } - -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java deleted file mode 100644 index 6443193..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pageview; - -import com.fasterxml.jackson.databind.JsonNode; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.streams.processor.TimestampExtractor; - -/** - * A timestamp extractor implementation that tries to extract event time from - * the "timestamp" field in the Json formatted message. - */ -public class JsonTimestampExtractor implements TimestampExtractor { - - @Override - public long extract(ConsumerRecord<Object, Object> record) { - if (record.value() instanceof PageViewTypedJob.PageView) { - return ((PageViewTypedJob.PageView) record.value()).timestamp; - } - - if (record.value() instanceof PageViewTypedJob.UserProfile) { - return ((PageViewTypedJob.UserProfile) record.value()).timestamp; - } - - if (record.value() instanceof JsonNode) { - return ((JsonNode) record.value()).get("timestamp").longValue(); - } - - throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value()); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java deleted file mode 100644 index f7266e3..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java +++ /dev/null @@ -1,186 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pageview; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.StreamsConfig; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, - * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes - * in Kafka Streams. - * - * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" - * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format - * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class PageViewTypedJob { - - // POJO classes - static public class PageView { - public String user; - public String page; - public Long timestamp; - } - - static public class UserProfile { - public String region; - public Long timestamp; - } - - static public class PageViewByRegion { - public String user; - public String page; - public String region; - } - - static public class WindowedPageViewByRegion { - public long windowStart; - public String region; - } - - static public class RegionCount { - public long count; - public String region; - } - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - final Deserializer<Long> longDeserializer = new LongDeserializer(); - - // TODO: the following can be removed with a serialization factory - Map<String, Object> serdeProps = new HashMap<>(); - - final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", PageView.class); - pageViewDeserializer.configure(serdeProps, false); - - final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileDeserializer.configure(serdeProps, false); - - final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileSerializer.configure(serdeProps, false); - - final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); - wPageViewByRegionSerializer.configure(serdeProps, false); - - final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", RegionCount.class); - regionCountSerializer.configure(serdeProps, false); - - KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input"); - - KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input"); - - KStream<WindowedPageViewByRegion, RegionCount> regionCount = views - .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { - @Override - public PageViewByRegion apply(PageView view, UserProfile profile) { - PageViewByRegion viewByRegion = new PageViewByRegion(); - viewByRegion.user = view.user; - viewByRegion.page = view.page; - - if (profile != null) { - viewByRegion.region = profile.region; - } else { - viewByRegion.region = "UNKNOWN"; - } - return viewByRegion; - } - }) - .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() { - @Override - public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) { - return new KeyValue<>(viewRegion.region, viewRegion); - } - }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), - stringSerializer, longSerializer, - stringDeserializer, longDeserializer) - // TODO: we can merge ths toStream().map(...) with a single toStream(...) - .toStream() - .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { - @Override - public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { - WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); - wViewByRegion.windowStart = key.window().start(); - wViewByRegion.region = key.value(); - - RegionCount rCount = new RegionCount(); - rCount.region = key.value(); - rCount.count = value; - - return new KeyValue<>(wViewByRegion, rCount); - } - }); - - // write to the result topic - regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java deleted file mode 100644 index 3241b8f..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pageview/PageViewUnTypedJob.java +++ /dev/null @@ -1,142 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pageview; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.connect.json.JsonSerializer; -import org.apache.kafka.connect.json.JsonDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.Windowed; - -import java.util.Properties; - -/** - * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, - * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes - * in Kafka Streams. - * - * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" - * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format - * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class PageViewUntypedJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - final Deserializer<Long> longDeserializer = new LongDeserializer(); - final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); - final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); - - KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input"); - - KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input"); - - KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() { - @Override - public String apply(JsonNode record) { - return record.get("region").textValue(); - } - }); - - KStream<JsonNode, JsonNode> regionCount = views - .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() { - @Override - public JsonNode apply(JsonNode view, String region) { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); - - return jNode.put("user", view.get("user").textValue()) - .put("page", view.get("page").textValue()) - .put("region", region == null ? "UNKNOWN" : region); - } - }) - .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() { - @Override - public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) { - return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); - } - }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), - stringSerializer, longSerializer, - stringDeserializer, longDeserializer) - // TODO: we can merge ths toStream().map(...) with a single toStream(...) - .toStream() - .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { - @Override - public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { - ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); - keyNode.put("window-start", key.window().start()) - .put("region", key.value()); - - ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); - valueNode.put("count", value); - - return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); - } - }); - - // write to the result topic - regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java deleted file mode 100644 index 79649d1..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java +++ /dev/null @@ -1,66 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pipe; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Properties; - -/** - * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to - * write data to a sink (output) topic. - * - * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input" - * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class PipeJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - builder.stream("streams-file-input").to("streams-pipe-output"); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java deleted file mode 100644 index 965eb79..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java +++ /dev/null @@ -1,99 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.wordcount; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueMapper; - -import java.util.Arrays; -import java.util.Properties; - -/** - * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program - * that computes a simple word occurrence histogram from an input text. - * - * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages - * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record - * is an updated count of a single word. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class WordCountJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - final Deserializer<Long> longDeserializer = new LongDeserializer(); - - KStream<String, String> source = builder.stream("streams-file-input"); - - KTable<String, Long> counts = source - .flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(String value) { - return Arrays.asList(value.toLowerCase().split(" ")); - } - }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { - @Override - public KeyValue<String, String> apply(String key, String value) { - return new KeyValue<String, String>(value, value); - } - }) - .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts"); - - counts.to("streams-wordcount-output", stringSerializer, longSerializer); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java deleted file mode 100644 index f5dd775..0000000 --- a/streams/examples/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java +++ /dev/null @@ -1,138 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.wordcount; - -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; - -import java.util.Properties; - -/** - * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program - * that computes a simple word occurrence histogram from an input text. - * - * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages - * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record - * is an updated count of a single word. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class WordCountProcessorJob { - - private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { - - @Override - public Processor<String, String> get() { - return new Processor<String, String>() { - private ProcessorContext context; - private KeyValueStore<String, Integer> kvStore; - - @Override - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - this.context = context; - this.context.schedule(1000); - this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); - } - - @Override - public void process(String dummy, String line) { - String words[] = line.toLowerCase().split(" "); - - for (String word : words) { - Integer oldValue = this.kvStore.get(word); - - if (oldValue == null) { - this.kvStore.put(word, 1); - } else { - this.kvStore.put(word, oldValue + 1); - } - } - - context.commit(); - } - - @Override - public void punctuate(long timestamp) { - KeyValueIterator<String, Integer> iter = this.kvStore.all(); - - System.out.println("----------- " + timestamp + " ----------- "); - - while (iter.hasNext()) { - KeyValue<String, Integer> entry = iter.next(); - - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value.toString()); - } - - iter.close(); - } - - @Override - public void close() { - this.kvStore.close(); - } - }; - } - } - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.addSource("Source", "streams-file-input"); - - builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); - builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); - - builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java new file mode 100644 index 0000000..5fcd1f3 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJODeserializer.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Deserializer; + +import java.util.Map; + +public class JsonPOJODeserializer<T> implements Deserializer<T> { + private ObjectMapper objectMapper = new ObjectMapper(); + + private Class<T> tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJODeserializer() { + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> props, boolean isKey) { + tClass = (Class<T>) props.get("JsonPOJOClass"); + } + + @Override + public T deserialize(String topic, byte[] bytes) { + if (bytes == null) + return null; + + T data; + try { + data = objectMapper.readValue(bytes, tClass); + } catch (Exception e) { + throw new SerializationException(e); + } + + return data; + } + + @Override + public void close() { + + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java new file mode 100644 index 0000000..bb60327 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonPOJOSerializer.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ +package org.apache.kafka.streams.examples.pageview; + + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Map; + +public class JsonPOJOSerializer<T> implements Serializer<T> { + private final ObjectMapper objectMapper = new ObjectMapper(); + + private Class<T> tClass; + + /** + * Default constructor needed by Kafka + */ + public JsonPOJOSerializer() { + + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map<String, ?> props, boolean isKey) { + tClass = (Class<T>) props.get("JsonPOJOClass"); + } + + @Override + public byte[] serialize(String topic, T data) { + if (data == null) + return null; + + try { + return objectMapper.writeValueAsBytes(data); + } catch (Exception e) { + throw new SerializationException("Error serializing JSON message", e); + } + } + + @Override + public void close() { + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java new file mode 100644 index 0000000..6443193 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.processor.TimestampExtractor; + +/** + * A timestamp extractor implementation that tries to extract event time from + * the "timestamp" field in the Json formatted message. + */ +public class JsonTimestampExtractor implements TimestampExtractor { + + @Override + public long extract(ConsumerRecord<Object, Object> record) { + if (record.value() instanceof PageViewTypedJob.PageView) { + return ((PageViewTypedJob.PageView) record.value()).timestamp; + } + + if (record.value() instanceof PageViewTypedJob.UserProfile) { + return ((PageViewTypedJob.UserProfile) record.value()).timestamp; + } + + if (record.value() instanceof JsonNode) { + return ((JsonNode) record.value()).get("timestamp").longValue(); + } + + throw new IllegalArgumentException("JsonTimestampExtractor cannot recognize the record value " + record.value()); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java new file mode 100644 index 0000000..6a105fd --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java @@ -0,0 +1,185 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, + * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes + * in Kafka Streams. + * + * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format + * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PageViewTypedJob { + + // POJO classes + static public class PageView { + public String user; + public String page; + public Long timestamp; + } + + static public class UserProfile { + public String region; + public Long timestamp; + } + + static public class PageViewByRegion { + public String user; + public String page; + public String region; + } + + static public class WindowedPageViewByRegion { + public long windowStart; + public String region; + } + + static public class RegionCount { + public long count; + public String region; + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + final Deserializer<Long> longDeserializer = new LongDeserializer(); + + // TODO: the following can be removed with a serialization factory + Map<String, Object> serdeProps = new HashMap<>(); + + final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", PageView.class); + pageViewDeserializer.configure(serdeProps, false); + + final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileDeserializer.configure(serdeProps, false); + + final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileSerializer.configure(serdeProps, false); + + final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); + wPageViewByRegionSerializer.configure(serdeProps, false); + + final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", RegionCount.class); + regionCountSerializer.configure(serdeProps, false); + + KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input"); + + KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input"); + + KStream<WindowedPageViewByRegion, RegionCount> regionCount = views + .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { + @Override + public PageViewByRegion apply(PageView view, UserProfile profile) { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + + if (profile != null) { + viewByRegion.region = profile.region; + } else { + viewByRegion.region = "UNKNOWN"; + } + return viewByRegion; + } + }) + .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() { + @Override + public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) { + return new KeyValue<>(viewRegion.region, viewRegion); + } + }) + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + // TODO: we can merge ths toStream().map(...) with a single toStream(...) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { + @Override + public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { + WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); + wViewByRegion.windowStart = key.window().start(); + wViewByRegion.region = key.value(); + + RegionCount rCount = new RegionCount(); + rCount.region = key.value(); + rCount.count = value; + + return new KeyValue<>(wViewByRegion, rCount); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java new file mode 100644 index 0000000..e890589 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Properties; + +/** + * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, + * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes + * in Kafka Streams. + * + * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format + * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PageViewUntypedJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + final Deserializer<Long> longDeserializer = new LongDeserializer(); + final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); + final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); + + KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input"); + + KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input"); + + KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() { + @Override + public String apply(JsonNode record) { + return record.get("region").textValue(); + } + }); + + KStream<JsonNode, JsonNode> regionCount = views + .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() { + @Override + public JsonNode apply(JsonNode view, String region) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + return jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region == null ? "UNKNOWN" : region); + } + }) + .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() { + @Override + public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) { + return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); + } + }) + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, longSerializer, + stringDeserializer, longDeserializer) + // TODO: we can merge ths toStream().map(...) with a single toStream(...) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { + @Override + public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { + ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("window-start", key.window().start()) + .put("region", key.value()); + + ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); + valueNode.put("count", value); + + return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java new file mode 100644 index 0000000..8885ca2 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pipe; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.Properties; + +/** + * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to + * write data to a sink (output) topic. + * + * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input" + * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PipeJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + builder.stream("streams-file-input").to("streams-pipe-output"); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java new file mode 100644 index 0000000..82d216e --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; + +import java.util.Arrays; +import java.util.Properties; + +/** + * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + * + * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record + * is an updated count of a single word. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class WordCountJob { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + final Deserializer<Long> longDeserializer = new LongDeserializer(); + + KStream<String, String> source = builder.stream("streams-file-input"); + + KTable<String, Long> counts = source + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase().split(" ")); + } + }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { + @Override + public KeyValue<String, String> apply(String key, String value) { + return new KeyValue<String, String>(value, value); + } + }) + .countByKey(stringSerializer, longSerializer, stringDeserializer, longDeserializer, "Counts"); + + counts.to("streams-wordcount-output", stringSerializer, longSerializer); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/edeb11bc/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java new file mode 100644 index 0000000..cb82656 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +import java.util.Properties; + +/** + * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + * + * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record + * is an updated count of a single word. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class WordCountProcessorJob { + + private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { + + @Override + public Processor<String, String> get() { + return new Processor<String, String>() { + private ProcessorContext context; + private KeyValueStore<String, Integer> kvStore; + + @Override + @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + this.context = context; + this.context.schedule(1000); + this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); + } + + @Override + public void process(String dummy, String line) { + String[] words = line.toLowerCase().split(" "); + + for (String word : words) { + Integer oldValue = this.kvStore.get(word); + + if (oldValue == null) { + this.kvStore.put(word, 1); + } else { + this.kvStore.put(word, oldValue + 1); + } + } + + context.commit(); + } + + @Override + public void punctuate(long timestamp) { + KeyValueIterator<String, Integer> iter = this.kvStore.all(); + + System.out.println("----------- " + timestamp + " ----------- "); + + while (iter.hasNext()) { + KeyValue<String, Integer> entry = iter.next(); + + System.out.println("[" + entry.key + ", " + entry.value + "]"); + + context.forward(entry.key, entry.value.toString()); + } + + iter.close(); + } + + @Override + public void close() { + this.kvStore.close(); + } + }; + } + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("Source", "streams-file-input"); + + builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); + builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); + + builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the streaming job would be ever running, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +}
