[ https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-19903: ------------------------------------- Description: PySpark example reads a Kafka stream. There is watermarking set when handling the data window. The defined query uses output Append mode. The PySpark engine reports the error: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets' The Python example: ------------------------------------------------------------------------------- {code} import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, window if __name__ == "__main__": if len(sys.argv) != 4: print(""" Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics> """, file=sys.stderr) exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] spark = SparkSession\ .builder\ .appName("StructuredKafkaWordCount")\ .getOrCreate() # Create DataSet representing the stream of input lines from kafka lines = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .load()\ .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ).count() # Start running the query that prints the running counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option("truncate", "false")\ .start() query.awaitTermination() {code} The corresponding example in Zeppelin notebook: {code} %spark.pyspark from pyspark.sql.functions import explode, split, window # Create DataSet representing the stream of input lines from kafka lines = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", "localhost:9092")\ .option("subscribe", "words")\ .load()\ .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ).count() # Start running the query that prints the running counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option("truncate", "false")\ .start() query.awaitTermination() -------------------------------------------------------------------------------------- Note that the Scala version of the same example in Zeppelin notebook works fine: ---------------------------------------------------------------------------------------- import java.sql.Timestamp import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.functions._ // Create DataSet representing the stream of input lines from kafka val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "words") .load() // Split the lines into words, retaining timestamps val words = lines .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") .as[(String, Timestamp)] .flatMap(line => line._1.split(" ").map(word => (word, line._2))) .toDF("word", "timestamp") // Group the data by window and word and compute the count of each group val windowedCounts = words .withWatermark("timestamp", "30 seconds") .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word") .count() // Start running the query that prints the windowed word counts to the console val query = windowedCounts.writeStream .outputMode("append") .format("console") .trigger(ProcessingTime("35 seconds")) .option("truncate", "false") .start() query.awaitTermination() {code} was: PySpark example reads a Kafka stream. There is watermarking set when handling the data window. The defined query uses output Append mode. The PySpark engine reports the error: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets' The Python example: ------------------------------------------------------------------------------- import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, window if __name__ == "__main__": if len(sys.argv) != 4: print(""" Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics> """, file=sys.stderr) exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] spark = SparkSession\ .builder\ .appName("StructuredKafkaWordCount")\ .getOrCreate() # Create DataSet representing the stream of input lines from kafka lines = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .load()\ .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ).count() # Start running the query that prints the running counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option("truncate", "false")\ .start() query.awaitTermination() --------------------------------------------------------------------- The corresponding example in Zeppelin notebook: --------------------------------------------------------------- %spark.pyspark from pyspark.sql.functions import explode, split, window # Create DataSet representing the stream of input lines from kafka lines = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", "localhost:9092")\ .option("subscribe", "words")\ .load()\ .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ).count() # Start running the query that prints the running counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option("truncate", "false")\ .start() query.awaitTermination() -------------------------------------------------------------------------------------- Note that the Scala version of the same example in Zeppelin notebook works fine: ---------------------------------------------------------------------------------------- import java.sql.Timestamp import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.functions._ // Create DataSet representing the stream of input lines from kafka val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "words") .load() // Split the lines into words, retaining timestamps val words = lines .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") .as[(String, Timestamp)] .flatMap(line => line._1.split(" ").map(word => (word, line._2))) .toDF("word", "timestamp") // Group the data by window and word and compute the count of each group val windowedCounts = words .withWatermark("timestamp", "30 seconds") .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word") .count() // Start running the query that prints the windowed word counts to the console val query = windowedCounts.writeStream .outputMode("append") .format("console") .trigger(ProcessingTime("35 seconds")) .option("truncate", "false") .start() query.awaitTermination() ----------------------------------------------------------------------------------------- > PySpark Kafka streaming query ouput append mode not possible > ------------------------------------------------------------ > > Key: SPARK-19903 > URL: https://issues.apache.org/jira/browse/SPARK-19903 > Project: Spark > Issue Type: Bug > Components: PySpark, Structured Streaming > Affects Versions: 2.1.0 > Environment: Ubuntu Linux > Reporter: Piotr Nestorow > > PySpark example reads a Kafka stream. There is watermarking set when handling > the data window. The defined query uses output Append mode. > The PySpark engine reports the error: > 'Append output mode not supported when there are streaming aggregations on > streaming DataFrames/DataSets' > The Python example: > ------------------------------------------------------------------------------- > {code} > import sys > from pyspark.sql import SparkSession > from pyspark.sql.functions import explode, split, window > if __name__ == "__main__": > if len(sys.argv) != 4: > print(""" > Usage: structured_kafka_wordcount.py <bootstrap-servers> > <subscribe-type> <topics> > """, file=sys.stderr) > exit(-1) > bootstrapServers = sys.argv[1] > subscribeType = sys.argv[2] > topics = sys.argv[3] > spark = SparkSession\ > .builder\ > .appName("StructuredKafkaWordCount")\ > .getOrCreate() > # Create DataSet representing the stream of input lines from kafka > lines = spark\ > .readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", bootstrapServers)\ > .option(subscribeType, topics)\ > .load()\ > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") > # Split the lines into words, retaining timestamps > # split() splits each line into an array, and explode() turns the array > into multiple rows > words = lines.select( > explode(split(lines.value, ' ')).alias('word'), > lines.timestamp > ) > # Group the data by window and word and compute the count of each group > windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( > window(words.timestamp, "30 seconds", "30 seconds"), words.word > ).count() > # Start running the query that prints the running counts to the console > query = windowedCounts\ > .writeStream\ > .outputMode('append')\ > .format('console')\ > .option("truncate", "false")\ > .start() > query.awaitTermination() > {code} > The corresponding example in Zeppelin notebook: > {code} > %spark.pyspark > from pyspark.sql.functions import explode, split, window > # Create DataSet representing the stream of input lines from kafka > lines = spark\ > .readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", "localhost:9092")\ > .option("subscribe", "words")\ > .load()\ > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") > # Split the lines into words, retaining timestamps > # split() splits each line into an array, and explode() turns the array into > multiple rows > words = lines.select( > explode(split(lines.value, ' ')).alias('word'), > lines.timestamp > ) > # Group the data by window and word and compute the count of each group > windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( > window(words.timestamp, "30 seconds", "30 seconds"), words.word > ).count() > # Start running the query that prints the running counts to the console > query = windowedCounts\ > .writeStream\ > .outputMode('append')\ > .format('console')\ > .option("truncate", "false")\ > .start() > query.awaitTermination() > -------------------------------------------------------------------------------------- > Note that the Scala version of the same example in Zeppelin notebook works > fine: > ---------------------------------------------------------------------------------------- > import java.sql.Timestamp > import org.apache.spark.sql.streaming.ProcessingTime > import org.apache.spark.sql.functions._ > // Create DataSet representing the stream of input lines from kafka > val lines = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "words") > .load() > // Split the lines into words, retaining timestamps > val words = lines > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS > TIMESTAMP)") > .as[(String, Timestamp)] > .flatMap(line => line._1.split(" ").map(word => (word, line._2))) > .toDF("word", "timestamp") > // Group the data by window and word and compute the count of each group > val windowedCounts = words > .withWatermark("timestamp", "30 seconds") > .groupBy(window($"timestamp", "30 seconds", "30 seconds"), > $"word") > .count() > // Start running the query that prints the windowed word counts to the console > val query = windowedCounts.writeStream > .outputMode("append") > .format("console") > .trigger(ProcessingTime("35 seconds")) > .option("truncate", "false") > .start() > query.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org