KAFKA-3411: Streams: stop using "job" terminology, rename job.id to application.id
guozhangwang ymatsuda : please review. Author: Michael G. Noll <[email protected]> Reviewers: Guozhang Wang <[email protected]> Closes #1081 from miguno/KAFKA-3411 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/958e10c8 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/958e10c8 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/958e10c8 Branch: refs/heads/trunk Commit: 958e10c87ce293c3bf59bb9840eaaae915eff25e Parents: 9a836d0 Author: Michael G. Noll <[email protected]> Authored: Thu Mar 17 10:41:48 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Mar 17 10:41:48 2016 -0700 ---------------------------------------------------------------------- .../pageview/JsonTimestampExtractor.java | 8 +- .../examples/pageview/PageViewTypedDemo.java | 180 ++++++++++++++++++ .../examples/pageview/PageViewTypedJob.java | 184 ------------------- .../examples/pageview/PageViewUntypedDemo.java | 136 ++++++++++++++ .../examples/pageview/PageViewUntypedJob.java | 140 -------------- .../kafka/streams/examples/pipe/PipeDemo.java | 65 +++++++ .../kafka/streams/examples/pipe/PipeJob.java | 65 ------- .../examples/wordcount/WordCountDemo.java | 96 ++++++++++ .../examples/wordcount/WordCountJob.java | 96 ---------- .../wordcount/WordCountProcessorDemo.java | 137 ++++++++++++++ .../wordcount/WordCountProcessorJob.java | 137 -------------- .../org/apache/kafka/streams/KafkaStreams.java | 12 +- .../org/apache/kafka/streams/StreamsConfig.java | 14 +- .../streams/processor/PartitionGrouper.java | 8 +- .../streams/processor/ProcessorContext.java | 6 +- .../streams/processor/TopologyBuilder.java | 42 ++--- .../processor/internals/AbstractTask.java | 16 +- .../internals/ProcessorContextImpl.java | 9 +- .../internals/ProcessorStateManager.java | 14 +- .../processor/internals/StandbyContextImpl.java | 14 +- .../processor/internals/StandbyTask.java | 12 +- .../internals/StreamPartitionAssignor.java | 4 +- .../streams/processor/internals/StreamTask.java | 6 +- .../processor/internals/StreamThread.java | 28 +-- .../state/internals/StoreChangeLogger.java | 2 +- .../apache/kafka/streams/StreamsConfigTest.java | 7 +- .../internals/ProcessorStateManagerTest.java | 28 +-- .../internals/ProcessorTopologyTest.java | 2 +- .../processor/internals/StandbyTaskTest.java | 20 +- .../internals/StreamPartitionAssignorTest.java | 2 +- .../processor/internals/StreamTaskTest.java | 6 +- .../processor/internals/StreamThreadTest.java | 34 ++-- .../streams/smoketest/SmokeTestClient.java | 2 +- .../apache/kafka/test/MockProcessorContext.java | 4 +- .../kafka/test/ProcessorTopologyTestDriver.java | 6 +- 35 files changed, 762 insertions(+), 780 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 6443193..63e8377 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -29,12 +29,12 @@ public class JsonTimestampExtractor implements TimestampExtractor { @Override public long extract(ConsumerRecord<Object, Object> record) { - if (record.value() instanceof PageViewTypedJob.PageView) { - return ((PageViewTypedJob.PageView) record.value()).timestamp; + if (record.value() instanceof PageViewTypedDemo.PageView) { + return ((PageViewTypedDemo.PageView) record.value()).timestamp; } - if (record.value() instanceof PageViewTypedJob.UserProfile) { - return ((PageViewTypedJob.UserProfile) record.value()).timestamp; + if (record.value() instanceof PageViewTypedDemo.UserProfile) { + return ((PageViewTypedDemo.UserProfile) record.value()).timestamp; } if (record.value() instanceof JsonNode) { http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java new file mode 100644 index 0000000..4f9de29 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedDemo.java @@ -0,0 +1,180 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.Windowed; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +/** + * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, + * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes + * in Kafka Streams. + * + * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format + * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PageViewTypedDemo { + + // POJO classes + static public class PageView { + public String user; + public String page; + public Long timestamp; + } + + static public class UserProfile { + public String region; + public Long timestamp; + } + + static public class PageViewByRegion { + public String user; + public String page; + public String region; + } + + static public class WindowedPageViewByRegion { + public long windowStart; + public String region; + } + + static public class RegionCount { + public long count; + public String region; + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-typed"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + + // TODO: the following can be removed with a serialization factory + Map<String, Object> serdeProps = new HashMap<>(); + + final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", PageView.class); + pageViewDeserializer.configure(serdeProps, false); + + final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileDeserializer.configure(serdeProps, false); + + final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", UserProfile.class); + userProfileSerializer.configure(serdeProps, false); + + final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); + wPageViewByRegionSerializer.configure(serdeProps, false); + + final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); + serdeProps.put("JsonPOJOClass", RegionCount.class); + regionCountSerializer.configure(serdeProps, false); + + KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input"); + + KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input"); + + KStream<WindowedPageViewByRegion, RegionCount> regionCount = views + .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { + @Override + public PageViewByRegion apply(PageView view, UserProfile profile) { + PageViewByRegion viewByRegion = new PageViewByRegion(); + viewByRegion.user = view.user; + viewByRegion.page = view.page; + + if (profile != null) { + viewByRegion.region = profile.region; + } else { + viewByRegion.region = "UNKNOWN"; + } + return viewByRegion; + } + }) + .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() { + @Override + public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) { + return new KeyValue<>(viewRegion.region, viewRegion); + } + }) + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, stringDeserializer) + // TODO: we can merge ths toStream().map(...) with a single toStream(...) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { + @Override + public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { + WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); + wViewByRegion.windowStart = key.window().start(); + wViewByRegion.region = key.value(); + + RegionCount rCount = new RegionCount(); + rCount.region = key.value(); + rCount.count = value; + + return new KeyValue<>(wViewByRegion, rCount); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the stream application would be running forever, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java deleted file mode 100644 index 1fcb403..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewTypedJob.java +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pageview; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.Windowed; -import org.apache.kafka.streams.StreamsConfig; - -import java.util.HashMap; -import java.util.Map; -import java.util.Properties; - -/** - * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, - * using specific data types (here: JSON POJO; but can also be Avro specific bindings, etc.) for serdes - * in Kafka Streams. - * - * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" - * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format - * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class PageViewTypedJob { - - // POJO classes - static public class PageView { - public String user; - public String page; - public Long timestamp; - } - - static public class UserProfile { - public String region; - public Long timestamp; - } - - static public class PageViewByRegion { - public String user; - public String page; - public String region; - } - - static public class WindowedPageViewByRegion { - public long windowStart; - public String region; - } - - static public class RegionCount { - public long count; - public String region; - } - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-typed"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonPOJOSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonPOJODeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - final Deserializer<Long> longDeserializer = new LongDeserializer(); - - // TODO: the following can be removed with a serialization factory - Map<String, Object> serdeProps = new HashMap<>(); - - final Deserializer<PageView> pageViewDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", PageView.class); - pageViewDeserializer.configure(serdeProps, false); - - final Deserializer<UserProfile> userProfileDeserializer = new JsonPOJODeserializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileDeserializer.configure(serdeProps, false); - - final Serializer<UserProfile> userProfileSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", UserProfile.class); - userProfileSerializer.configure(serdeProps, false); - - final Serializer<WindowedPageViewByRegion> wPageViewByRegionSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", WindowedPageViewByRegion.class); - wPageViewByRegionSerializer.configure(serdeProps, false); - - final Serializer<RegionCount> regionCountSerializer = new JsonPOJOSerializer<>(); - serdeProps.put("JsonPOJOClass", RegionCount.class); - regionCountSerializer.configure(serdeProps, false); - - KStream<String, PageView> views = builder.stream(stringDeserializer, pageViewDeserializer, "streams-pageview-input"); - - KTable<String, UserProfile> users = builder.table(stringSerializer, userProfileSerializer, stringDeserializer, userProfileDeserializer, "streams-userprofile-input"); - - KStream<WindowedPageViewByRegion, RegionCount> regionCount = views - .leftJoin(users, new ValueJoiner<PageView, UserProfile, PageViewByRegion>() { - @Override - public PageViewByRegion apply(PageView view, UserProfile profile) { - PageViewByRegion viewByRegion = new PageViewByRegion(); - viewByRegion.user = view.user; - viewByRegion.page = view.page; - - if (profile != null) { - viewByRegion.region = profile.region; - } else { - viewByRegion.region = "UNKNOWN"; - } - return viewByRegion; - } - }) - .map(new KeyValueMapper<String, PageViewByRegion, KeyValue<String, PageViewByRegion>>() { - @Override - public KeyValue<String, PageViewByRegion> apply(String user, PageViewByRegion viewRegion) { - return new KeyValue<>(viewRegion.region, viewRegion); - } - }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), - stringSerializer, stringDeserializer) - // TODO: we can merge ths toStream().map(...) with a single toStream(...) - .toStream() - .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<WindowedPageViewByRegion, RegionCount>>() { - @Override - public KeyValue<WindowedPageViewByRegion, RegionCount> apply(Windowed<String> key, Long value) { - WindowedPageViewByRegion wViewByRegion = new WindowedPageViewByRegion(); - wViewByRegion.windowStart = key.window().start(); - wViewByRegion.region = key.value(); - - RegionCount rCount = new RegionCount(); - rCount.region = key.value(); - rCount.count = value; - - return new KeyValue<>(wViewByRegion, rCount); - } - }); - - // write to the result topic - regionCount.to("streams-pageviewstats-typed-output", wPageViewByRegionSerializer, regionCountSerializer); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java new file mode 100644 index 0000000..9377095 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedDemo.java @@ -0,0 +1,136 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pageview; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.connect.json.JsonSerializer; +import org.apache.kafka.connect.json.JsonDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.HoppingWindows; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueJoiner; +import org.apache.kafka.streams.kstream.ValueMapper; +import org.apache.kafka.streams.kstream.Windowed; + +import java.util.Properties; + +/** + * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, + * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes + * in Kafka Streams. + * + * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" + * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format + * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PageViewUntypedDemo { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pageview-untyped"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); + props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); + final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); + + KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input"); + + KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input"); + + KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() { + @Override + public String apply(JsonNode record) { + return record.get("region").textValue(); + } + }); + + KStream<JsonNode, JsonNode> regionCount = views + .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() { + @Override + public JsonNode apply(JsonNode view, String region) { + ObjectNode jNode = JsonNodeFactory.instance.objectNode(); + + return jNode.put("user", view.get("user").textValue()) + .put("page", view.get("page").textValue()) + .put("region", region == null ? "UNKNOWN" : region); + } + }) + .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() { + @Override + public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) { + return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); + } + }) + .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), + stringSerializer, stringDeserializer) + // TODO: we can merge ths toStream().map(...) with a single toStream(...) + .toStream() + .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { + @Override + public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { + ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); + keyNode.put("window-start", key.window().start()) + .put("region", key.value()); + + ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); + valueNode.put("count", value); + + return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); + } + }); + + // write to the result topic + regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the stream application would be running forever, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java deleted file mode 100644 index fb1a55d..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/PageViewUntypedJob.java +++ /dev/null @@ -1,140 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pageview; - -import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.JsonNodeFactory; -import com.fasterxml.jackson.databind.node.ObjectNode; -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongDeserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.connect.json.JsonSerializer; -import org.apache.kafka.connect.json.JsonDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.HoppingWindows; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueJoiner; -import org.apache.kafka.streams.kstream.ValueMapper; -import org.apache.kafka.streams.kstream.Windowed; - -import java.util.Properties; - -/** - * Demonstrates how to perform a join between a KStream and a KTable, i.e. an example of a stateful computation, - * using general data types (here: JSON; but can also be Avro generic bindings, etc.) for serdes - * in Kafka Streams. - * - * In this example, we join a stream of pageviews (aka clickstreams) that reads from a topic named "streams-pageview-input" - * with a user profile table that reads from a topic named "streams-userprofile-input", where the data format - * is JSON string representing a record in the stream or table, to compute the number of pageviews per user region. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class PageViewUntypedJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pageview-untyped"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class); - props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, JsonTimestampExtractor.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - final Deserializer<Long> longDeserializer = new LongDeserializer(); - final Serializer<JsonNode> jsonSerializer = new JsonSerializer(); - final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer(); - - KStream<String, JsonNode> views = builder.stream(stringDeserializer, jsonDeserializer, "streams-pageview-input"); - - KTable<String, JsonNode> users = builder.table(stringSerializer, jsonSerializer, stringDeserializer, jsonDeserializer, "streams-userprofile-input"); - - KTable<String, String> userRegions = users.mapValues(new ValueMapper<JsonNode, String>() { - @Override - public String apply(JsonNode record) { - return record.get("region").textValue(); - } - }); - - KStream<JsonNode, JsonNode> regionCount = views - .leftJoin(userRegions, new ValueJoiner<JsonNode, String, JsonNode>() { - @Override - public JsonNode apply(JsonNode view, String region) { - ObjectNode jNode = JsonNodeFactory.instance.objectNode(); - - return jNode.put("user", view.get("user").textValue()) - .put("page", view.get("page").textValue()) - .put("region", region == null ? "UNKNOWN" : region); - } - }) - .map(new KeyValueMapper<String, JsonNode, KeyValue<String, JsonNode>>() { - @Override - public KeyValue<String, JsonNode> apply(String user, JsonNode viewRegion) { - return new KeyValue<>(viewRegion.get("region").textValue(), viewRegion); - } - }) - .countByKey(HoppingWindows.of("GeoPageViewsWindow").with(7 * 24 * 60 * 60 * 1000), - stringSerializer, stringDeserializer) - // TODO: we can merge ths toStream().map(...) with a single toStream(...) - .toStream() - .map(new KeyValueMapper<Windowed<String>, Long, KeyValue<JsonNode, JsonNode>>() { - @Override - public KeyValue<JsonNode, JsonNode> apply(Windowed<String> key, Long value) { - ObjectNode keyNode = JsonNodeFactory.instance.objectNode(); - keyNode.put("window-start", key.window().start()) - .put("region", key.value()); - - ObjectNode valueNode = JsonNodeFactory.instance.objectNode(); - valueNode.put("count", value); - - return new KeyValue<>((JsonNode) keyNode, (JsonNode) valueNode); - } - }); - - // write to the result topic - regionCount.to("streams-pageviewstats-untyped-output", jsonSerializer, jsonSerializer); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java new file mode 100644 index 0000000..c37c68a --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeDemo.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.pipe; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.StreamsConfig; + +import java.util.Properties; + +/** + * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to + * write data to a sink (output) topic. + * + * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input" + * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class PipeDemo { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + builder.stream("streams-file-input").to("streams-pipe-output"); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the stream application would be running forever, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java deleted file mode 100644 index 8885ca2..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pipe/PipeJob.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.pipe; - -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.StreamsConfig; - -import java.util.Properties; - -/** - * Demonstrates, using the high-level KStream DSL, how to read data from a source (input) topic and how to - * write data to a sink (output) topic. - * - * In this example, we implement a simple "pipe" program that reads from a source topic "streams-file-input" - * and writes the data as-is (i.e. unmodified) into a sink topic "streams-pipe-output". - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class PipeJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-pipe"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - builder.stream("streams-file-input").to("streams-pipe-output"); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java new file mode 100644 index 0000000..03d5142 --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountDemo.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.kstream.KStreamBuilder; +import org.apache.kafka.streams.kstream.KStream; +import org.apache.kafka.streams.kstream.KTable; +import org.apache.kafka.streams.kstream.KeyValueMapper; +import org.apache.kafka.streams.kstream.ValueMapper; + +import java.util.Arrays; +import java.util.Properties; + +/** + * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + * + * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record + * is an updated count of a single word. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class WordCountDemo { + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + KStreamBuilder builder = new KStreamBuilder(); + + final Serializer<String> stringSerializer = new StringSerializer(); + final Deserializer<String> stringDeserializer = new StringDeserializer(); + final Serializer<Long> longSerializer = new LongSerializer(); + + KStream<String, String> source = builder.stream("streams-file-input"); + + KTable<String, Long> counts = source + .flatMapValues(new ValueMapper<String, Iterable<String>>() { + @Override + public Iterable<String> apply(String value) { + return Arrays.asList(value.toLowerCase().split(" ")); + } + }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { + @Override + public KeyValue<String, String> apply(String key, String value) { + return new KeyValue<String, String>(value, value); + } + }) + .countByKey(stringSerializer, stringDeserializer, "Counts"); + + counts.to("streams-wordcount-output", stringSerializer, longSerializer); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the stream application would be running forever, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java deleted file mode 100644 index 2b51a44..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountJob.java +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.wordcount; - -import org.apache.kafka.common.serialization.Deserializer; -import org.apache.kafka.common.serialization.LongSerializer; -import org.apache.kafka.common.serialization.Serializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.kstream.KStreamBuilder; -import org.apache.kafka.streams.kstream.KStream; -import org.apache.kafka.streams.kstream.KTable; -import org.apache.kafka.streams.kstream.KeyValueMapper; -import org.apache.kafka.streams.kstream.ValueMapper; - -import java.util.Arrays; -import java.util.Properties; - -/** - * Demonstrates, using the high-level KStream DSL, how to implement the WordCount program - * that computes a simple word occurrence histogram from an input text. - * - * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages - * represent lines of text; and the histogram output is written to topic "streams-wordcount-output" where each record - * is an updated count of a single word. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class WordCountJob { - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - KStreamBuilder builder = new KStreamBuilder(); - - final Serializer<String> stringSerializer = new StringSerializer(); - final Deserializer<String> stringDeserializer = new StringDeserializer(); - final Serializer<Long> longSerializer = new LongSerializer(); - - KStream<String, String> source = builder.stream("streams-file-input"); - - KTable<String, Long> counts = source - .flatMapValues(new ValueMapper<String, Iterable<String>>() { - @Override - public Iterable<String> apply(String value) { - return Arrays.asList(value.toLowerCase().split(" ")); - } - }).map(new KeyValueMapper<String, String, KeyValue<String, String>>() { - @Override - public KeyValue<String, String> apply(String key, String value) { - return new KeyValue<String, String>(value, value); - } - }) - .countByKey(stringSerializer, stringDeserializer, "Counts"); - - counts.to("streams-wordcount-output", stringSerializer, longSerializer); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java new file mode 100644 index 0000000..b651b3a --- /dev/null +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorDemo.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.examples.wordcount; + +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.StreamsConfig; +import org.apache.kafka.streams.KafkaStreams; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.ProcessorSupplier; +import org.apache.kafka.streams.processor.TopologyBuilder; +import org.apache.kafka.streams.state.KeyValueIterator; +import org.apache.kafka.streams.state.KeyValueStore; +import org.apache.kafka.streams.state.Stores; + +import java.util.Properties; + +/** + * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program + * that computes a simple word occurrence histogram from an input text. + * + * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages + * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record + * is an updated count of a single word. + * + * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) + * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. + */ +public class WordCountProcessorDemo { + + private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { + + @Override + public Processor<String, String> get() { + return new Processor<String, String>() { + private ProcessorContext context; + private KeyValueStore<String, Integer> kvStore; + + @Override + @SuppressWarnings("unchecked") + public void init(ProcessorContext context) { + this.context = context; + this.context.schedule(1000); + this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); + } + + @Override + public void process(String dummy, String line) { + String[] words = line.toLowerCase().split(" "); + + for (String word : words) { + Integer oldValue = this.kvStore.get(word); + + if (oldValue == null) { + this.kvStore.put(word, 1); + } else { + this.kvStore.put(word, oldValue + 1); + } + } + + context.commit(); + } + + @Override + public void punctuate(long timestamp) { + KeyValueIterator<String, Integer> iter = this.kvStore.all(); + + System.out.println("----------- " + timestamp + " ----------- "); + + while (iter.hasNext()) { + KeyValue<String, Integer> entry = iter.next(); + + System.out.println("[" + entry.key + ", " + entry.value + "]"); + + context.forward(entry.key, entry.value.toString()); + } + + iter.close(); + } + + @Override + public void close() { + this.kvStore.close(); + } + }; + } + } + + public static void main(String[] args) throws Exception { + Properties props = new Properties(); + props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount-processor"); + props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); + props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + + // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data + props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + + TopologyBuilder builder = new TopologyBuilder(); + + builder.addSource("Source", "streams-file-input"); + + builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); + builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); + + builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); + + KafkaStreams streams = new KafkaStreams(builder, props); + streams.start(); + + // usually the stream application would be running forever, + // in this example we just let it run for some time and stop since the input data is finite. + Thread.sleep(5000L); + + streams.close(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java deleted file mode 100644 index cb82656..0000000 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/wordcount/WordCountProcessorJob.java +++ /dev/null @@ -1,137 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.examples.wordcount; - -import org.apache.kafka.common.serialization.StringDeserializer; -import org.apache.kafka.common.serialization.StringSerializer; -import org.apache.kafka.streams.KeyValue; -import org.apache.kafka.streams.StreamsConfig; -import org.apache.kafka.streams.KafkaStreams; -import org.apache.kafka.streams.processor.Processor; -import org.apache.kafka.streams.processor.ProcessorContext; -import org.apache.kafka.streams.processor.ProcessorSupplier; -import org.apache.kafka.streams.processor.TopologyBuilder; -import org.apache.kafka.streams.state.KeyValueIterator; -import org.apache.kafka.streams.state.KeyValueStore; -import org.apache.kafka.streams.state.Stores; - -import java.util.Properties; - -/** - * Demonstrates, using the low-level Processor APIs, how to implement the WordCount program - * that computes a simple word occurrence histogram from an input text. - * - * In this example, the input stream reads from a topic named "streams-file-input", where the values of messages - * represent lines of text; and the histogram output is written to topic "streams-wordcount-processor-output" where each record - * is an updated count of a single word. - * - * Before running this example you must create the source topic (e.g. via bin/kafka-topics.sh --create ...) - * and write some data to it (e.g. via bin-kafka-console-producer.sh). Otherwise you won't see any data arriving in the output topic. - */ -public class WordCountProcessorJob { - - private static class MyProcessorSupplier implements ProcessorSupplier<String, String> { - - @Override - public Processor<String, String> get() { - return new Processor<String, String>() { - private ProcessorContext context; - private KeyValueStore<String, Integer> kvStore; - - @Override - @SuppressWarnings("unchecked") - public void init(ProcessorContext context) { - this.context = context; - this.context.schedule(1000); - this.kvStore = (KeyValueStore<String, Integer>) context.getStateStore("Counts"); - } - - @Override - public void process(String dummy, String line) { - String[] words = line.toLowerCase().split(" "); - - for (String word : words) { - Integer oldValue = this.kvStore.get(word); - - if (oldValue == null) { - this.kvStore.put(word, 1); - } else { - this.kvStore.put(word, oldValue + 1); - } - } - - context.commit(); - } - - @Override - public void punctuate(long timestamp) { - KeyValueIterator<String, Integer> iter = this.kvStore.all(); - - System.out.println("----------- " + timestamp + " ----------- "); - - while (iter.hasNext()) { - KeyValue<String, Integer> entry = iter.next(); - - System.out.println("[" + entry.key + ", " + entry.value + "]"); - - context.forward(entry.key, entry.value.toString()); - } - - iter.close(); - } - - @Override - public void close() { - this.kvStore.close(); - } - }; - } - } - - public static void main(String[] args) throws Exception { - Properties props = new Properties(); - props.put(StreamsConfig.JOB_ID_CONFIG, "streams-wordcount-processor"); - props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181"); - props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); - props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); - - // setting offset reset to earliest so that we can re-run the demo code with the same pre-loaded data - props.put(StreamsConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - - TopologyBuilder builder = new TopologyBuilder(); - - builder.addSource("Source", "streams-file-input"); - - builder.addProcessor("Process", new MyProcessorSupplier(), "Source"); - builder.addStateStore(Stores.create("Counts").withStringKeys().withIntegerValues().inMemory().build(), "Process"); - - builder.addSink("Sink", "streams-wordcount-processor-output", "Process"); - - KafkaStreams streams = new KafkaStreams(builder, props); - streams.start(); - - // usually the streaming job would be ever running, - // in this example we just let it run for some time and stop since the input data is finite. - Thread.sleep(5000L); - - streams.close(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java index 15d6d8b..20958e4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java +++ b/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java @@ -46,7 +46,7 @@ import java.util.concurrent.atomic.AtomicInteger; * The {@link KafkaStreams} class manages the lifecycle of a Kafka Streams instance. One stream instance can contain one or * more threads specified in the configs for the processing work. * <p> - * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same job ID (whether in this same process, on other processes + * A {@link KafkaStreams} instance can co-ordinate with any other instances with the same application ID (whether in this same process, on other processes * on this machine, or on remote machines) as a single (possibly distributed) stream processing client. These instances will divide up the work * based on the assignment of the input topic partitions so that all partitions are being * consumed. If instances are added or failed, all instances will rebelance the partition assignment among themselves @@ -59,7 +59,7 @@ import java.util.concurrent.atomic.AtomicInteger; * A simple example might look like this: * <pre> * Map<String, Object> props = new HashMap<>(); - * props.put(StreamsConfig.JOB_ID_CONFIG, "my-job"); + * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); * props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); * props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); @@ -113,12 +113,12 @@ public class KafkaStreams { this.processId = UUID.randomUUID(); - // JobId is a required config and hence should always have value - String jobId = config.getString(StreamsConfig.JOB_ID_CONFIG); + // The application ID is a required config and hence should always have value + String applicationId = config.getString(StreamsConfig.APPLICATION_ID_CONFIG); String clientId = config.getString(StreamsConfig.CLIENT_ID_CONFIG); if (clientId.length() <= 0) - clientId = jobId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement(); + clientId = applicationId + "-" + STREAM_CLIENT_ID_SEQUENCE.getAndIncrement(); List<MetricsReporter> reporters = config.getConfiguredInstances(StreamsConfig.METRIC_REPORTER_CLASSES_CONFIG, MetricsReporter.class); @@ -132,7 +132,7 @@ public class KafkaStreams { this.threads = new StreamThread[config.getInt(StreamsConfig.NUM_STREAM_THREADS_CONFIG)]; for (int i = 0; i < this.threads.length; i++) { - this.threads[i] = new StreamThread(builder, config, jobId, clientId, processId, metrics, time); + this.threads[i] = new StreamThread(builder, config, applicationId, clientId, processId, metrics, time); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index c4b8ffe..52fdbd4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -83,13 +83,13 @@ public class StreamsConfig extends AbstractConfig { public static final String PARTITION_GROUPER_CLASS_CONFIG = "partition.grouper"; private static final String PARTITION_GROUPER_CLASS_DOC = "Partition grouper class that implements the <code>PartitionGrouper</code> interface."; - /** <code>job.id</code> */ - public static final String JOB_ID_CONFIG = "job.id"; - public static final String JOB_ID_DOC = "An id string to identify for the stream job. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; + /** <code>application.id</code> */ + public static final String APPLICATION_ID_CONFIG = "application.id"; + public static final String APPLICATION_ID_DOC = "An identifier for the stream processing application. Must be unique within the Kafka cluster. It is used as 1) the default client-id prefix, 2) the group-id for membership management, 3) the changelog topic prefix."; /** <code>replication.factor</code> */ public static final String REPLICATION_FACTOR_CONFIG = "replication.factor"; - public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the job."; + public static final String REPLICATION_FACTOR_DOC = "The replication factor for change log topics and repartition topics created by the stream processing application."; /** <code>key.serializer</code> */ public static final String KEY_SERIALIZER_CLASS_CONFIG = ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG; @@ -124,10 +124,10 @@ public class StreamsConfig extends AbstractConfig { private static final String WALLCLOCK_TIMESTAMP_EXTRACTOR = "org.apache.kafka.streams.processor.internals.WallclockTimestampExtractor"; static { - CONFIG = new ConfigDef().define(JOB_ID_CONFIG, // required with no default value + CONFIG = new ConfigDef().define(APPLICATION_ID_CONFIG, // required with no default value Type.STRING, Importance.HIGH, - StreamsConfig.JOB_ID_DOC) + StreamsConfig.APPLICATION_ID_DOC) .define(BOOTSTRAP_SERVERS_CONFIG, // required with no default value Type.STRING, Importance.HIGH, @@ -297,7 +297,7 @@ public class StreamsConfig extends AbstractConfig { } private void removeStreamsSpecificConfigs(Map<String, Object> props) { - props.remove(StreamsConfig.JOB_ID_CONFIG); + props.remove(StreamsConfig.APPLICATION_ID_CONFIG); props.remove(StreamsConfig.STATE_DIR_CONFIG); props.remove(StreamsConfig.BUFFERED_RECORDS_PER_PARTITION_CONFIG); props.remove(StreamsConfig.NUM_STREAM_THREADS_CONFIG); http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java index ae9844d..0c94084 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/PartitionGrouper.java @@ -28,7 +28,8 @@ import java.util.Set; * * This grouper also acts as the stream task creation function along with partition distribution * such that each generated partition group is assigned with a distinct {@link TaskId}; - * the created task ids will then be assigned to Kafka Streams instances that host the stream job. + * the created task ids will then be assigned to Kafka Streams instances that host the stream + * processing application. */ public interface PartitionGrouper { @@ -37,9 +38,10 @@ public interface PartitionGrouper { * expected to be processed together must be in the same group. DefaultPartitionGrouper implements this * interface. See {@link DefaultPartitionGrouper} for more information. * - * @param topicGroups The map from the {@link TopologyBuilder#topicGroups() topic group} id to topics + * @param topicGroups The map from the {@link TopologyBuilder#topicGroups(String)} topic group} id to topics * @param metadata Metadata of the consuming cluster * @return a map of task ids to groups of partitions */ Map<TaskId, Set<TopicPartition>> partitionGroups(Map<Integer, Set<String>> topicGroups, Cluster metadata); -} + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java index 79376ba..e9d5252 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java @@ -29,11 +29,11 @@ import java.io.File; public interface ProcessorContext { /** - * Returns the job id + * Returns the application id * - * @return the job id + * @return the application id */ - String jobId(); + String applicationId(); /** * Returns the task id http://git-wip-us.apache.org/repos/asf/kafka/blob/958e10c8/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java index 6e5aec5..ab7122b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java @@ -85,7 +85,7 @@ public class TopologyBuilder { this.name = name; } - public abstract ProcessorNode build(String jobId); + public abstract ProcessorNode build(String applicationId); } private static class ProcessorNodeFactory extends NodeFactory { @@ -105,7 +105,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String jobId) { + public ProcessorNode build(String applicationId) { return new ProcessorNode(name, supplier.get(), stateStoreNames); } } @@ -124,7 +124,7 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String jobId) { + public ProcessorNode build(String applicationId) { return new SourceNode(name, keyDeserializer, valDeserializer); } } @@ -147,10 +147,10 @@ public class TopologyBuilder { @SuppressWarnings("unchecked") @Override - public ProcessorNode build(String jobId) { + public ProcessorNode build(String applicationId) { if (internalTopicNames.contains(topic)) { - // prefix the job id to the internal topic name - return new SinkNode(name, jobId + "-" + topic, keySerializer, valSerializer, partitioner); + // prefix the internal topic name with the application id + return new SinkNode(name, applicationId + "-" + topic, keySerializer, valSerializer, partitioner); } else { return new SinkNode(name, topic, keySerializer, valSerializer, partitioner); } @@ -496,7 +496,7 @@ public class TopologyBuilder { * * @return groups of topic names */ - public Map<Integer, TopicsInfo> topicGroups(String jobId) { + public Map<Integer, TopicsInfo> topicGroups(String applicationId) { Map<Integer, TopicsInfo> topicGroups = new HashMap<>(); if (nodeGroups == null) @@ -514,8 +514,8 @@ public class TopologyBuilder { // if some of the topics are internal, add them to the internal topics for (String topic : topics) { if (this.internalTopicNames.contains(topic)) { - // prefix the job id to the internal topic name - String internalTopic = jobId + "-" + topic; + // prefix the internal topic name with the application id + String internalTopic = applicationId + "-" + topic; internalSourceTopics.add(internalTopic); sourceTopics.add(internalTopic); } else { @@ -528,8 +528,8 @@ public class TopologyBuilder { String topic = nodeToSinkTopic.get(node); if (topic != null) { if (internalTopicNames.contains(topic)) { - // prefix the job id to the change log topic name - sinkTopics.add(jobId + "-" + topic); + // prefix the change log topic name with the application id + sinkTopics.add(applicationId + "-" + topic); } else { sinkTopics.add(topic); } @@ -538,8 +538,8 @@ public class TopologyBuilder { // if the node is connected to a state, add to the state topics for (StateStoreFactory stateFactory : stateFactories.values()) { if (stateFactory.isInternal && stateFactory.users.contains(node)) { - // prefix the job id to the change log topic name - stateChangelogTopics.add(jobId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); + // prefix the change log topic name with the application id + stateChangelogTopics.add(applicationId + "-" + stateFactory.supplier.name() + ProcessorStateManager.STATE_CHANGELOG_TOPIC_SUFFIX); } } } @@ -637,7 +637,7 @@ public class TopologyBuilder { * * @see org.apache.kafka.streams.KafkaStreams#KafkaStreams(TopologyBuilder, org.apache.kafka.streams.StreamsConfig) */ - public ProcessorTopology build(String jobId, Integer topicGroupId) { + public ProcessorTopology build(String applicationId, Integer topicGroupId) { Set<String> nodeGroup; if (topicGroupId != null) { nodeGroup = nodeGroups().get(topicGroupId); @@ -645,11 +645,11 @@ public class TopologyBuilder { // when nodeGroup is null, we build the full topology. this is used in some tests. nodeGroup = null; } - return build(jobId, nodeGroup); + return build(applicationId, nodeGroup); } @SuppressWarnings("unchecked") - private ProcessorTopology build(String jobId, Set<String> nodeGroup) { + private ProcessorTopology build(String applicationId, Set<String> nodeGroup) { List<ProcessorNode> processorNodes = new ArrayList<>(nodeFactories.size()); Map<String, ProcessorNode> processorMap = new HashMap<>(); Map<String, SourceNode> topicSourceMap = new HashMap<>(); @@ -658,7 +658,7 @@ public class TopologyBuilder { // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) for (NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { - ProcessorNode node = factory.build(jobId); + ProcessorNode node = factory.build(applicationId); processorNodes.add(node); processorMap.put(node.name(), node); @@ -674,8 +674,8 @@ public class TopologyBuilder { } else if (factory instanceof SourceNodeFactory) { for (String topic : ((SourceNodeFactory) factory).topics) { if (internalTopicNames.contains(topic)) { - // prefix the job id to the internal topic name - topicSourceMap.put(jobId + "-" + topic, (SourceNode) node); + // prefix the internal topic name with the application id + topicSourceMap.put(applicationId + "-" + topic, (SourceNode) node); } else { topicSourceMap.put(topic, (SourceNode) node); } @@ -697,11 +697,11 @@ public class TopologyBuilder { * Get the names of topics that are to be consumed by the source nodes created by this builder. * @return the unmodifiable set of topic names used by source nodes, which changes as new sources are added; never null */ - public Set<String> sourceTopics(String jobId) { + public Set<String> sourceTopics(String applicationId) { Set<String> topics = new HashSet<>(); for (String topic : sourceTopicNames) { if (internalTopicNames.contains(topic)) { - topics.add(jobId + "-" + topic); + topics.add(applicationId + "-" + topic); } else { topics.add(topic); }
