[jira] [Commented] (SPARK-19903) Watermark metadata is lost when using resolved attributes

2018-09-11 Thread Shixiong Zhu (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610931#comment-16610931
 ] 

Shixiong Zhu commented on SPARK-19903:
--

Yes. I removed the target version.

> Watermark metadata is lost when using resolved attributes
> -
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Ubuntu Linux
>Reporter: Piotr Nestorow
>Priority: Major
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> ---
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py  
>  
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array 
> into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> 
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
> .toDF("word", 

[jira] [Commented] (SPARK-19903) Watermark metadata is lost when using resolved attributes

2018-09-11 Thread Wenchen Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16610664#comment-16610664
 ] 

Wenchen Fan commented on SPARK-19903:
-

[~zsxwing] is this still a bug?

> Watermark metadata is lost when using resolved attributes
> -
>
> Key: SPARK-19903
> URL: https://issues.apache.org/jira/browse/SPARK-19903
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 2.1.0
> Environment: Ubuntu Linux
>Reporter: Piotr Nestorow
>Priority: Major
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> ---
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
> if len(sys.argv) != 4:
> print("""
> Usage: structured_kafka_wordcount.py  
>  
> """, file=sys.stderr)
> exit(-1)
> bootstrapServers = sys.argv[1]
> subscribeType = sys.argv[2]
> topics = sys.argv[3]
> spark = SparkSession\
> .builder\
> .appName("StructuredKafkaWordCount")\
> .getOrCreate()
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", bootstrapServers)\
> .option(subscribeType, topics)\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array 
> into multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
> .readStream\
> .format("kafka")\
> .option("kafka.bootstrap.servers", "localhost:9092")\
> .option("subscribe", "words")\
> .load()\
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
> explode(split(lines.value, ' ')).alias('word'),
> lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
> window(words.timestamp, "30 seconds", "30 seconds"), words.word
> ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
> .writeStream\
> .outputMode('append')\
> .format('console')\
> .option("truncate", "false")\
> .start()
> query.awaitTermination()
> --
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> 
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
> .readStream
> .format("kafka")
> .option("kafka.bootstrap.servers", "localhost:9092")
> .option("subscribe", "words")
> .load()
> // Split the lines into words, retaining timestamps
> val words = lines
> .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
> .as[(String, Timestamp)]
> .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
> .toDF("word",