[ https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Michael Armbrust updated SPARK-19903: ------------------------------------- Summary: Watermark metadata is lost when using resolved attributes (was: PySpark Kafka streaming query ouput append mode not possible) > Watermark metadata is lost when using resolved attributes > --------------------------------------------------------- > > Key: SPARK-19903 > URL: https://issues.apache.org/jira/browse/SPARK-19903 > Project: Spark > Issue Type: Bug > Components: Structured Streaming > Affects Versions: 2.1.0 > Environment: Ubuntu Linux > Reporter: Piotr Nestorow > > PySpark example reads a Kafka stream. There is watermarking set when handling > the data window. The defined query uses output Append mode. > The PySpark engine reports the error: > 'Append output mode not supported when there are streaming aggregations on > streaming DataFrames/DataSets' > The Python example: > ------------------------------------------------------------------------------- > {code} > import sys > from pyspark.sql import SparkSession > from pyspark.sql.functions import explode, split, window > if __name__ == "__main__": > if len(sys.argv) != 4: > print(""" > Usage: structured_kafka_wordcount.py <bootstrap-servers> > <subscribe-type> <topics> > """, file=sys.stderr) > exit(-1) > bootstrapServers = sys.argv[1] > subscribeType = sys.argv[2] > topics = sys.argv[3] > spark = SparkSession\ > .builder\ > .appName("StructuredKafkaWordCount")\ > .getOrCreate() > # Create DataSet representing the stream of input lines from kafka > lines = spark\ > .readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", bootstrapServers)\ > .option(subscribeType, topics)\ > .load()\ > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") > # Split the lines into words, retaining timestamps > # split() splits each line into an array, and explode() turns the array > into multiple rows > words = lines.select( > explode(split(lines.value, ' ')).alias('word'), > lines.timestamp > ) > # Group the data by window and word and compute the count of each group > windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( > window(words.timestamp, "30 seconds", "30 seconds"), words.word > ).count() > # Start running the query that prints the running counts to the console > query = windowedCounts\ > .writeStream\ > .outputMode('append')\ > .format('console')\ > .option("truncate", "false")\ > .start() > query.awaitTermination() > {code} > The corresponding example in Zeppelin notebook: > {code} > %spark.pyspark > from pyspark.sql.functions import explode, split, window > # Create DataSet representing the stream of input lines from kafka > lines = spark\ > .readStream\ > .format("kafka")\ > .option("kafka.bootstrap.servers", "localhost:9092")\ > .option("subscribe", "words")\ > .load()\ > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)") > # Split the lines into words, retaining timestamps > # split() splits each line into an array, and explode() turns the array into > multiple rows > words = lines.select( > explode(split(lines.value, ' ')).alias('word'), > lines.timestamp > ) > # Group the data by window and word and compute the count of each group > windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy( > window(words.timestamp, "30 seconds", "30 seconds"), words.word > ).count() > # Start running the query that prints the running counts to the console > query = windowedCounts\ > .writeStream\ > .outputMode('append')\ > .format('console')\ > .option("truncate", "false")\ > .start() > query.awaitTermination() > -------------------------------------------------------------------------------------- > Note that the Scala version of the same example in Zeppelin notebook works > fine: > ---------------------------------------------------------------------------------------- > import java.sql.Timestamp > import org.apache.spark.sql.streaming.ProcessingTime > import org.apache.spark.sql.functions._ > // Create DataSet representing the stream of input lines from kafka > val lines = spark > .readStream > .format("kafka") > .option("kafka.bootstrap.servers", "localhost:9092") > .option("subscribe", "words") > .load() > // Split the lines into words, retaining timestamps > val words = lines > .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS > TIMESTAMP)") > .as[(String, Timestamp)] > .flatMap(line => line._1.split(" ").map(word => (word, line._2))) > .toDF("word", "timestamp") > // Group the data by window and word and compute the count of each group > val windowedCounts = words > .withWatermark("timestamp", "30 seconds") > .groupBy(window($"timestamp", "30 seconds", "30 seconds"), > $"word") > .count() > // Start running the query that prints the windowed word counts to the console > val query = windowedCounts.writeStream > .outputMode("append") > .format("console") > .trigger(ProcessingTime("35 seconds")) > .option("truncate", "false") > .start() > query.awaitTermination() > {code} -- This message was sent by Atlassian JIRA (v6.3.15#6346) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org