[ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19903:
-------------------------------------
    Summary: Watermark metadata is lost when using resolved attributes  (was: 
PySpark Kafka streaming query ouput append mode not possible)

> Watermark metadata is lost when using resolved attributes
> ---------------------------------------------------------
>
>                 Key: SPARK-19903
>                 URL: https://issues.apache.org/jira/browse/SPARK-19903
>             Project: Spark
>          Issue Type: Bug
>          Components: Structured Streaming
>    Affects Versions: 2.1.0
>         Environment: Ubuntu Linux
>            Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> -------------------------------------------------------------------------------
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
>     if len(sys.argv) != 4:
>         print("""
>         Usage: structured_kafka_wordcount.py <bootstrap-servers> 
> <subscribe-type> <topics>
>         """, file=sys.stderr)
>         exit(-1)
>     bootstrapServers = sys.argv[1]
>     subscribeType = sys.argv[2]
>     topics = sys.argv[3]
>     spark = SparkSession\
>         .builder\
>         .appName("StructuredKafkaWordCount")\
>         .getOrCreate()
>     # Create DataSet representing the stream of input lines from kafka
>     lines = spark\
>         .readStream\
>         .format("kafka")\
>         .option("kafka.bootstrap.servers", bootstrapServers)\
>         .option(subscribeType, topics)\
>         .load()\
>         .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
>     # Split the lines into words, retaining timestamps
>     # split() splits each line into an array, and explode() turns the array 
> into multiple rows
>     words = lines.select(
>         explode(split(lines.value, ' ')).alias('word'),
>         lines.timestamp
>     )
>     # Group the data by window and word and compute the count of each group
>     windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
>         window(words.timestamp, "30 seconds", "30 seconds"), words.word
>         ).count()
>     # Start running the query that prints the running counts to the console
>     query = windowedCounts\
>         .writeStream\
>         .outputMode('append')\
>         .format('console')\
>         .option("truncate", "false")\
>         .start()
>     query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
>     .readStream\
>     .format("kafka")\
>     .option("kafka.bootstrap.servers", "localhost:9092")\
>     .option("subscribe", "words")\
>     .load()\
>     .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
>     explode(split(lines.value, ' ')).alias('word'),
>     lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
>     window(words.timestamp, "30 seconds", "30 seconds"), words.word
>     ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
>     .writeStream\
>     .outputMode('append')\
>     .format('console')\
>     .option("truncate", "false")\
>     .start()
> query.awaitTermination()
> --------------------------------------------------------------------------------------
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> ----------------------------------------------------------------------------------------
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
>             .readStream
>             .format("kafka")
>             .option("kafka.bootstrap.servers", "localhost:9092")
>             .option("subscribe", "words")
>             .load()
> // Split the lines into words, retaining timestamps
> val words = lines
>             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
>             .as[(String, Timestamp)]
>             .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
>             .toDF("word", "timestamp")
> // Group the data by window and word and compute the count of each group
> val windowedCounts = words
>             .withWatermark("timestamp", "30 seconds")
>             .groupBy(window($"timestamp", "30 seconds", "30 seconds"), 
> $"word")
>             .count()
> // Start running the query that prints the windowed word counts to the console
> val query = windowedCounts.writeStream
>             .outputMode("append")
>             .format("console")
>             .trigger(ProcessingTime("35 seconds"))
>             .option("truncate", "false")
>             .start()
> query.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to