Piotr Nestorow created SPARK-19903: -------------------------------------- Summary: PySpark Kafka streaming query ouput append mode not possible Key: SPARK-19903 URL: https://issues.apache.org/jira/browse/SPARK-19903 Project: Spark Issue Type: Bug Components: PySpark, Structured Streaming Affects Versions: 2.1.0 Environment: Ubuntu Linux Reporter: Piotr Nestorow
PySpark example reads a Kafka stream. There is watermarking set when handling the data window. The defined query uses output Append mode. The PySpark engine reports the error: 'Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets' The Python example: ------------------------------------------------------------------------------- import sys from pyspark.sql import SparkSession from pyspark.sql.functions import explode, split, window if __name__ == "__main__": if len(sys.argv) != 4: print(""" Usage: structured_kafka_wordcount.py <bootstrap-servers> <subscribe-type> <topics> """, file=sys.stderr) exit(-1) bootstrapServers = sys.argv[1] subscribeType = sys.argv[2] topics = sys.argv[3] spark = SparkSession\ .builder\ .appName("StructuredKafkaWordCount")\ .getOrCreate() # Create DataSet representing the stream of input lines from kafka lines = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", bootstrapServers)\ .option(subscribeType, topics)\ .load()\ .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ).count() # Start running the query that prints the running counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option("truncate", "false")\ .start() query.awaitTermination() --------------------------------------------------------------------- The corresponding example in Zeppelin notebook: --------------------------------------------------------------- %spark.pyspark from pyspark.sql.functions import explode, split, window # Create DataSet representing the stream of input lines from kafka lines = spark\ .readStream\ .format("kafka")\ .option("kafka.bootstrap.servers", "localhost:9092")\ .option("subscribe", "words")\ .load()\ .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") # Split the lines into words, retaining timestamps # split() splits each line into an array, and explode() turns the array into multiple rows words = lines.select( explode(split(lines.value, ' ')).alias('word'), lines.timestamp ) # Group the data by window and word and compute the count of each group windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( window(words.timestamp, "30 seconds", "30 seconds"), words.word ).count() # Start running the query that prints the running counts to the console query = windowedCounts\ .writeStream\ .outputMode('append')\ .format('console')\ .option("truncate", "false")\ .start() query.awaitTermination() -------------------------------------------------------------------------------------- Note that the Scala version of the same example in Zeppelin notebook works fine: ---------------------------------------------------------------------------------------- import java.sql.Timestamp import org.apache.spark.sql.streaming.ProcessingTime import org.apache.spark.sql.functions._ // Create DataSet representing the stream of input lines from kafka val lines = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", "localhost:9092") .option("subscribe", "words") .load() // Split the lines into words, retaining timestamps val words = lines .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") .as[(String, Timestamp)] .flatMap(line => line._1.split(" ").map(word => (word, line._2))) .toDF("word", "timestamp") // Group the data by window and word and compute the count of each group val windowedCounts = words .withWatermark("timestamp", "30 seconds") .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word") .count() // Start running the query that prints the windowed word counts to the console val query = windowedCounts.writeStream .outputMode("append") .format("console") .trigger(ProcessingTime("35 seconds")) .option("truncate", "false") .start() query.awaitTermination() ----------------------------------------------------------------------------------------- -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org