Repository: spark Updated Branches: refs/heads/branch-2.1 a94659cee -> 6b2301b89
[SPARK-18410][STREAMING] Add structured kafka example ## What changes were proposed in this pull request? This PR provides structured kafka wordcount examples ## How was this patch tested? Author: uncleGen <husty...@gmail.com> Closes #15849 from uncleGen/SPARK-18410. (cherry picked from commit e6145772eda8d6d3727605e80a7c2f182c801003) Signed-off-by: Sean Owen <so...@cloudera.com> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/6b2301b8 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/6b2301b8 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/6b2301b8 Branch: refs/heads/branch-2.1 Commit: 6b2301b89bf5a89bd2b8a3d85c9c05a490be2ddb Parents: a94659c Author: uncleGen <husty...@gmail.com> Authored: Wed Nov 16 10:19:10 2016 +0000 Committer: Sean Owen <so...@cloudera.com> Committed: Wed Nov 16 10:19:18 2016 +0000 ---------------------------------------------------------------------- .../streaming/JavaStructuredKafkaWordCount.java | 96 ++++++++++++++++++++ .../sql/streaming/structured_kafka_wordcount.py | 90 ++++++++++++++++++ .../streaming/StructuredKafkaWordCount.scala | 85 +++++++++++++++++ 3 files changed, 271 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/6b2301b8/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java ---------------------------------------------------------------------- diff --git a/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java new file mode 100644 index 0000000..0f45cfe --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/sql/streaming/JavaStructuredKafkaWordCount.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.sql.streaming; + +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Encoders; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.streaming.StreamingQuery; + +import java.util.Arrays; +import java.util.Iterator; + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: JavaStructuredKafkaWordCount <bootstrap-servers> <subscribe-type> <topics> + * <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A + * comma-separated list of host:port. + * <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe', + * 'subscribePattern'. + * |- <assign> Specific TopicPartitions to consume. Json string + * | {"topicA":[0,1],"topicB":[2,4]}. + * |- <subscribe> The topic list to subscribe. A comma-separated list of + * | topics. + * |- <subscribePattern> The pattern used to subscribe to topic(s). + * | Java regex string. + * |- Only one of "assign, "subscribe" or "subscribePattern" options can be + * | specified for Kafka source. + * <topics> Different value format depends on the value of 'subscribe-type'. + * + * Example: + * `$ bin/run-example \ + * sql.streaming.JavaStructuredKafkaWordCount host1:port1,host2:port2 \ + * subscribe topic1,topic2` + */ +public final class JavaStructuredKafkaWordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 3) { + System.err.println("Usage: JavaStructuredKafkaWordCount <bootstrap-servers> " + + "<subscribe-type> <topics>"); + System.exit(1); + } + + String bootstrapServers = args[0]; + String subscribeType = args[1]; + String topics = args[2]; + + SparkSession spark = SparkSession + .builder() + .appName("JavaStructuredKafkaWordCount") + .getOrCreate(); + + // Create DataSet representing the stream of input lines from kafka + Dataset<String> lines = spark + .readStream() + .format("kafka") + .option("kafka.bootstrap.servers", bootstrapServers) + .option(subscribeType, topics) + .load() + .selectExpr("CAST(value AS STRING)") + .as(Encoders.STRING()); + + // Generate running word count + Dataset<Row> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() { + @Override + public Iterator<String> call(String x) { + return Arrays.asList(x.split(" ")).iterator(); + } + }, Encoders.STRING()).groupBy("value").count(); + + // Start running the query that prints the running counts to the console + StreamingQuery query = wordCounts.writeStream() + .outputMode("complete") + .format("console") + .start(); + + query.awaitTermination(); + } +} http://git-wip-us.apache.org/repos/asf/spark/blob/6b2301b8/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py b/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py new file mode 100644 index 0000000..9e8a552 --- /dev/null +++ b/examples/src/main/python/sql/streaming/structured_kafka_wordcount.py @@ -0,0 +1,90 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Consumes messages from one or more topics in Kafka and does wordcount. + Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics> + <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A + comma-separated list of host:port. + <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe', + 'subscribePattern'. + |- <assign> Specific TopicPartitions to consume. Json string + | {"topicA":[0,1],"topicB":[2,4]}. + |- <subscribe> The topic list to subscribe. A comma-separated list of + | topics. + |- <subscribePattern> The pattern used to subscribe to topic(s). + | Java regex string. + |- Only one of "assign, "subscribe" or "subscribePattern" options can be + | specified for Kafka source. + <topics> Different value format depends on the value of 'subscribe-type'. + + Run the example + `$ bin/spark-submit examples/src/main/python/sql/streaming/structured_kafka_wordcount.py \ + host1:port1,host2:port2 subscribe topic1,topic2` +""" +from __future__ import print_function + +import sys + +from pyspark.sql import SparkSession +from pyspark.sql.functions import explode +from pyspark.sql.functions import split + +if __name__ == "__main__": + if len(sys.argv) != 4: + print(""" + Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics> + """, file=sys.stderr) + exit(-1) + + bootstrapServers = sys.argv[1] + subscribeType = sys.argv[2] + topics = sys.argv[3] + + spark = SparkSession\ + .builder\ + .appName("StructuredKafkaWordCount")\ + .getOrCreate() + + # Create DataSet representing the stream of input lines from kafka + lines = spark\ + .readStream\ + .format("kafka")\ + .option("kafka.bootstrap.servers", bootstrapServers)\ + .option(subscribeType, topics)\ + .load()\ + .selectExpr("CAST(value AS STRING)") + + # Split the lines into words + words = lines.select( + # explode turns each item in an array into a separate row + explode( + split(lines.value, ' ') + ).alias('word') + ) + + # Generate running word count + wordCounts = words.groupBy('word').count() + + # Start running the query that prints the running counts to the console + query = wordCounts\ + .writeStream\ + .outputMode('complete')\ + .format('console')\ + .start() + + query.awaitTermination() http://git-wip-us.apache.org/repos/asf/spark/blob/6b2301b8/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala new file mode 100644 index 0000000..c26f73e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/sql/streaming/StructuredKafkaWordCount.scala @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +// scalastyle:off println +package org.apache.spark.examples.sql.streaming + +import org.apache.spark.sql.SparkSession + +/** + * Consumes messages from one or more topics in Kafka and does wordcount. + * Usage: StructuredKafkaWordCount <bootstrap-servers> <subscribe-type> <topics> + * <bootstrap-servers> The Kafka "bootstrap.servers" configuration. A + * comma-separated list of host:port. + * <subscribe-type> There are three kinds of type, i.e. 'assign', 'subscribe', + * 'subscribePattern'. + * |- <assign> Specific TopicPartitions to consume. Json string + * | {"topicA":[0,1],"topicB":[2,4]}. + * |- <subscribe> The topic list to subscribe. A comma-separated list of + * | topics. + * |- <subscribePattern> The pattern used to subscribe to topic(s). + * | Java regex string. + * |- Only one of "assign, "subscribe" or "subscribePattern" options can be + * | specified for Kafka source. + * <topics> Different value format depends on the value of 'subscribe-type'. + * + * Example: + * `$ bin/run-example \ + * sql.streaming.StructuredKafkaWordCount host1:port1,host2:port2 \ + * subscribe topic1,topic2` + */ +object StructuredKafkaWordCount { + def main(args: Array[String]): Unit = { + if (args.length < 3) { + System.err.println("Usage: StructuredKafkaWordCount <bootstrap-servers> " + + "<subscribe-type> <topics>") + System.exit(1) + } + + val Array(bootstrapServers, subscribeType, topics) = args + + val spark = SparkSession + .builder + .appName("StructuredKafkaWordCount") + .getOrCreate() + + import spark.implicits._ + + // Create DataSet representing the stream of input lines from kafka + val lines = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", bootstrapServers) + .option(subscribeType, topics) + .load() + .selectExpr("CAST(value AS STRING)") + .as[String] + + // Generate running word count + val wordCounts = lines.flatMap(_.split(" ")).groupBy("value").count() + + // Start running the query that prints the running counts to the console + val query = wordCounts.writeStream + .outputMode("complete") + .format("console") + .start() + + query.awaitTermination() + } + +} +// scalastyle:on println --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org