Repository: spark Updated Branches: refs/heads/master 754929b15 -> a2016b4bc
[SPARK-8444] [STREAMING] Adding Python streaming example for queueStream A Python example similar to the existing one for Scala. Author: Bryan Cutler <bjcut...@us.ibm.com> Closes #6884 from BryanCutler/streaming-queueStream-example-8444 and squashes the following commits: 435ba7e [Bryan Cutler] [SPARK-8444] Fixed style checks, increased sleep time to show empty queue 257abb0 [Bryan Cutler] [SPARK-8444] Stop context gracefully, Removed unused import, Added description comment 376ef6e [Bryan Cutler] [SPARK-8444] Fixed bug causing DStream.pprint to append empty parenthesis to output instead of blank line 1ff5f8b [Bryan Cutler] [SPARK-8444] Adding Python streaming example for queue_stream Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a2016b4b Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a2016b4b Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a2016b4b Branch: refs/heads/master Commit: a2016b4bc4ef13339f168c3f4e135fa422046137 Parents: 754929b Author: Bryan Cutler <bjcut...@us.ibm.com> Authored: Fri Jun 19 00:07:53 2015 -0700 Committer: Davies Liu <dav...@databricks.com> Committed: Fri Jun 19 00:07:53 2015 -0700 ---------------------------------------------------------------------- .../src/main/python/streaming/queue_stream.py | 50 ++++++++++++++++++++ python/pyspark/streaming/dstream.py | 2 +- 2 files changed, 51 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a2016b4b/examples/src/main/python/streaming/queue_stream.py ---------------------------------------------------------------------- diff --git a/examples/src/main/python/streaming/queue_stream.py b/examples/src/main/python/streaming/queue_stream.py new file mode 100644 index 0000000..dcd6a0f --- /dev/null +++ b/examples/src/main/python/streaming/queue_stream.py @@ -0,0 +1,50 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" + Create a queue of RDDs that will be mapped/reduced one at a time in + 1 second intervals. + + To run this example use + `$ bin/spark-submit examples/src/main/python/streaming/queue_stream.py +""" +import sys +import time + +from pyspark import SparkContext +from pyspark.streaming import StreamingContext + +if __name__ == "__main__": + + sc = SparkContext(appName="PythonStreamingQueueStream") + ssc = StreamingContext(sc, 1) + + # Create the queue through which RDDs can be pushed to + # a QueueInputDStream + rddQueue = [] + for i in xrange(5): + rddQueue += [ssc.sparkContext.parallelize([j for j in xrange(1, 1001)], 10)] + + # Create the QueueInputDStream and use it do some processing + inputStream = ssc.queueStream(rddQueue) + mappedStream = inputStream.map(lambda x: (x % 10, 1)) + reducedStream = mappedStream.reduceByKey(lambda a, b: a + b) + reducedStream.pprint() + + ssc.start() + time.sleep(6) + ssc.stop(stopSparkContext=True, stopGraceFully=True) http://git-wip-us.apache.org/repos/asf/spark/blob/a2016b4b/python/pyspark/streaming/dstream.py ---------------------------------------------------------------------- diff --git a/python/pyspark/streaming/dstream.py b/python/pyspark/streaming/dstream.py index ff09798..8dcb964 100644 --- a/python/pyspark/streaming/dstream.py +++ b/python/pyspark/streaming/dstream.py @@ -176,7 +176,7 @@ class DStream(object): print(record) if len(taken) > num: print("...") - print() + print("") self.foreachRDD(takeAndPrint) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org