Repository: kafka-site Updated Branches: refs/heads/asf-site 2e200cfce -> 6f8013869
MINOR: update streams hello world Project: http://git-wip-us.apache.org/repos/asf/kafka-site/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka-site/commit/6f801386 Tree: http://git-wip-us.apache.org/repos/asf/kafka-site/tree/6f801386 Diff: http://git-wip-us.apache.org/repos/asf/kafka-site/diff/6f801386 Branch: refs/heads/asf-site Commit: 6f80138695c78355195b683cae06f8a2f116a307 Parents: 2e200cf Author: Guozhang Wang <[email protected]> Authored: Wed Nov 1 16:03:52 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Nov 1 16:03:52 2017 -0700 ---------------------------------------------------------------------- 10/streams/index.html | 85 +++++++++++++++++++++++++--------------------- 1 file changed, 46 insertions(+), 39 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka-site/blob/6f801386/10/streams/index.html ---------------------------------------------------------------------- diff --git a/10/streams/index.html b/10/streams/index.html index ece9fa7..bf86adc 100644 --- a/10/streams/index.html +++ b/10/streams/index.html @@ -152,35 +152,37 @@ <pre class="brush: java;"> import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; + import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; - import org.apache.kafka.streams.kstream.KStream; - import org.apache.kafka.streams.kstream.KStreamBuilder; - import org.apache.kafka.streams.kstream.KTable; - + import org.apache.kafka.streams.Topology; + import org.apache.kafka.streams.kstream.Materialized; + import org.apache.kafka.streams.kstream.Produced; + import org.apache.kafka.streams.state.KeyValueStore; + import java.util.Arrays; import java.util.Properties; - + public class WordCountApplication { - + public static void main(final String[] args) throws Exception { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - - KStreamBuilder builder = new KStreamBuilder(); + + StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); KTable<String, Long> wordCounts = textLines .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("\\W+"))) .groupBy((key, word) -> word) - .count("Counts"); - wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); - - KafkaStreams streams = new KafkaStreams(builder, config); + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); + wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); + + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); } - + } </pre> </div> @@ -189,26 +191,28 @@ <pre class="brush: java;"> import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.KafkaStreams; + import org.apache.kafka.streams.StreamsBuilder; import org.apache.kafka.streams.StreamsConfig; - import org.apache.kafka.streams.kstream.KStream; - import org.apache.kafka.streams.kstream.KStreamBuilder; - import org.apache.kafka.streams.kstream.KTable; + import org.apache.kafka.streams.Topology; import org.apache.kafka.streams.kstream.KeyValueMapper; + import org.apache.kafka.streams.kstream.Materialized; + import org.apache.kafka.streams.kstream.Produced; import org.apache.kafka.streams.kstream.ValueMapper; - + import org.apache.kafka.streams.state.KeyValueStore; + import java.util.Arrays; import java.util.Properties; - + public class WordCountApplication { - + public static void main(final String[] args) throws Exception { Properties config = new Properties(); config.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application"); config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092"); config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); - - KStreamBuilder builder = new KStreamBuilder(); + + StreamsBuilder builder = new StreamsBuilder(); KStream<String, String> textLines = builder.stream("TextLinesTopic"); KTable<String, Long> wordCounts = textLines .flatMapValues(new ValueMapper<String, Iterable<String>>() { @@ -223,13 +227,15 @@ return word; } }) - .count("Counts"); - wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic"); - - KafkaStreams streams = new KafkaStreams(builder, config); + .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store")); + + + wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())); + + KafkaStreams streams = new KafkaStreams(builder.build(), config); streams.start(); } - + } </pre> </div> @@ -239,15 +245,16 @@ import java.lang.Long import java.util.Properties import java.util.concurrent.TimeUnit - + import org.apache.kafka.common.serialization._ import org.apache.kafka.streams._ - import org.apache.kafka.streams.kstream.{KStream, KStreamBuilder, KTable} - + import org.apache.kafka.streams.kstream.{KeyValueMapper, Materialized, Produced, ValueMapper} + import org.apache.kafka.streams.state.KeyValueStore; + import scala.collection.JavaConverters.asJavaIterableConverter - + object WordCountApplication { - + def main(args: Array[String]) { val config: Properties = { val p = new Properties() @@ -257,23 +264,23 @@ p.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass) p } - - val builder: KStreamBuilder = new KStreamBuilder() + + val builder: StreamsBuilder = new StreamsBuilder() val textLines: KStream[String, String] = builder.stream("TextLinesTopic") val wordCounts: KTable[String, Long] = textLines .flatMapValues(textLine => textLine.toLowerCase.split("\\W+").toIterable.asJava) .groupBy((_, word) => word) - .count("Counts") - wordCounts.to(Serdes.String(), Serdes.Long(), "WordsWithCountsTopic") - - val streams: KafkaStreams = new KafkaStreams(builder, config) + .count(Materialized.as("counts-store").asInstanceOf[Materialized[String, Long, KeyValueStore[Bytes, Array[Byte]]]]) + wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long())) + + val streams: KafkaStreams = new KafkaStreams(builder.build(), config) streams.start() - + Runtime.getRuntime.addShutdownHook(new Thread(() => { streams.close(10, TimeUnit.SECONDS) })) } - + } </pre> </div>
