[ 
https://issues.apache.org/jira/browse/SPARK-19903?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Armbrust updated SPARK-19903:
-------------------------------------
    Description: 
PySpark example reads a Kafka stream. There is watermarking set when handling 
the data window. The defined query uses output Append mode.

The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on 
streaming DataFrames/DataSets'

The Python example:
-------------------------------------------------------------------------------
{code}
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> 
<subscribe-type> <topics>
        """, file=sys.stderr)
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    # Create DataSet representing the stream of input lines from kafka
    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array 
into multiple rows
    words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    )

    # Group the data by window and word and compute the count of each group
    windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
        window(words.timestamp, "30 seconds", "30 seconds"), words.word
        ).count()

    # Start running the query that prints the running counts to the console
    query = windowedCounts\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .option("truncate", "false")\
        .start()

    query.awaitTermination()
{code}

The corresponding example in Zeppelin notebook:
{code}
%spark.pyspark

from pyspark.sql.functions import explode, split, window

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "words")\
    .load()\
    .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into 
multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
    window(words.timestamp, "30 seconds", "30 seconds"), words.word
    ).count()

# Start running the query that prints the running counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('append')\
    .format('console')\
    .option("truncate", "false")\
    .start()

query.awaitTermination()
--------------------------------------------------------------------------------------

Note that the Scala version of the same example in Zeppelin notebook works fine:
----------------------------------------------------------------------------------------
import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._

// Create DataSet representing the stream of input lines from kafka
val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "words")
            .load()

// Split the lines into words, retaining timestamps
val words = lines
            .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
            .as[(String, Timestamp)]
            .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
            .toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group
val windowedCounts = words
            .withWatermark("timestamp", "30 seconds")
            .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
            .count()

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
            .outputMode("append")
            .format("console")
            .trigger(ProcessingTime("35 seconds"))
            .option("truncate", "false")
            .start()

query.awaitTermination()
{code}


  was:
PySpark example reads a Kafka stream. There is watermarking set when handling 
the data window. The defined query uses output Append mode.

The PySpark engine reports the error:
'Append output mode not supported when there are streaming aggregations on 
streaming DataFrames/DataSets'

The Python example:
-------------------------------------------------------------------------------
import sys

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split, window

if __name__ == "__main__":
    if len(sys.argv) != 4:
        print("""
        Usage: structured_kafka_wordcount.py <bootstrap-servers> 
<subscribe-type> <topics>
        """, file=sys.stderr)
        exit(-1)

    bootstrapServers = sys.argv[1]
    subscribeType = sys.argv[2]
    topics = sys.argv[3]

    spark = SparkSession\
        .builder\
        .appName("StructuredKafkaWordCount")\
        .getOrCreate()

    # Create DataSet representing the stream of input lines from kafka
    lines = spark\
        .readStream\
        .format("kafka")\
        .option("kafka.bootstrap.servers", bootstrapServers)\
        .option(subscribeType, topics)\
        .load()\
        .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

    # Split the lines into words, retaining timestamps
    # split() splits each line into an array, and explode() turns the array 
into multiple rows
    words = lines.select(
        explode(split(lines.value, ' ')).alias('word'),
        lines.timestamp
    )

    # Group the data by window and word and compute the count of each group
    windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
        window(words.timestamp, "30 seconds", "30 seconds"), words.word
        ).count()

    # Start running the query that prints the running counts to the console
    query = windowedCounts\
        .writeStream\
        .outputMode('append')\
        .format('console')\
        .option("truncate", "false")\
        .start()

    query.awaitTermination()
---------------------------------------------------------------------

The corresponding example in Zeppelin notebook:
---------------------------------------------------------------
%spark.pyspark

from pyspark.sql.functions import explode, split, window

# Create DataSet representing the stream of input lines from kafka
lines = spark\
    .readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "localhost:9092")\
    .option("subscribe", "words")\
    .load()\
    .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")

# Split the lines into words, retaining timestamps
# split() splits each line into an array, and explode() turns the array into 
multiple rows
words = lines.select(
    explode(split(lines.value, ' ')).alias('word'),
    lines.timestamp
)

# Group the data by window and word and compute the count of each group
windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
    window(words.timestamp, "30 seconds", "30 seconds"), words.word
    ).count()

# Start running the query that prints the running counts to the console
query = windowedCounts\
    .writeStream\
    .outputMode('append')\
    .format('console')\
    .option("truncate", "false")\
    .start()

query.awaitTermination()
--------------------------------------------------------------------------------------

Note that the Scala version of the same example in Zeppelin notebook works fine:
----------------------------------------------------------------------------------------
import java.sql.Timestamp
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.sql.functions._

// Create DataSet representing the stream of input lines from kafka
val lines = spark
            .readStream
            .format("kafka")
            .option("kafka.bootstrap.servers", "localhost:9092")
            .option("subscribe", "words")
            .load()

// Split the lines into words, retaining timestamps
val words = lines
            .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
            .as[(String, Timestamp)]
            .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
            .toDF("word", "timestamp")

// Group the data by window and word and compute the count of each group
val windowedCounts = words
            .withWatermark("timestamp", "30 seconds")
            .groupBy(window($"timestamp", "30 seconds", "30 seconds"), $"word")
            .count()

// Start running the query that prints the windowed word counts to the console
val query = windowedCounts.writeStream
            .outputMode("append")
            .format("console")
            .trigger(ProcessingTime("35 seconds"))
            .option("truncate", "false")
            .start()

query.awaitTermination()
-----------------------------------------------------------------------------------------



> PySpark Kafka streaming query ouput append mode not possible
> ------------------------------------------------------------
>
>                 Key: SPARK-19903
>                 URL: https://issues.apache.org/jira/browse/SPARK-19903
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, Structured Streaming
>    Affects Versions: 2.1.0
>         Environment: Ubuntu Linux
>            Reporter: Piotr Nestorow
>
> PySpark example reads a Kafka stream. There is watermarking set when handling 
> the data window. The defined query uses output Append mode.
> The PySpark engine reports the error:
> 'Append output mode not supported when there are streaming aggregations on 
> streaming DataFrames/DataSets'
> The Python example:
> -------------------------------------------------------------------------------
> {code}
> import sys
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import explode, split, window
> if __name__ == "__main__":
>     if len(sys.argv) != 4:
>         print("""
>         Usage: structured_kafka_wordcount.py <bootstrap-servers> 
> <subscribe-type> <topics>
>         """, file=sys.stderr)
>         exit(-1)
>     bootstrapServers = sys.argv[1]
>     subscribeType = sys.argv[2]
>     topics = sys.argv[3]
>     spark = SparkSession\
>         .builder\
>         .appName("StructuredKafkaWordCount")\
>         .getOrCreate()
>     # Create DataSet representing the stream of input lines from kafka
>     lines = spark\
>         .readStream\
>         .format("kafka")\
>         .option("kafka.bootstrap.servers", bootstrapServers)\
>         .option(subscribeType, topics)\
>         .load()\
>         .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
>     # Split the lines into words, retaining timestamps
>     # split() splits each line into an array, and explode() turns the array 
> into multiple rows
>     words = lines.select(
>         explode(split(lines.value, ' ')).alias('word'),
>         lines.timestamp
>     )
>     # Group the data by window and word and compute the count of each group
>     windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
>         window(words.timestamp, "30 seconds", "30 seconds"), words.word
>         ).count()
>     # Start running the query that prints the running counts to the console
>     query = windowedCounts\
>         .writeStream\
>         .outputMode('append')\
>         .format('console')\
>         .option("truncate", "false")\
>         .start()
>     query.awaitTermination()
> {code}
> The corresponding example in Zeppelin notebook:
> {code}
> %spark.pyspark
> from pyspark.sql.functions import explode, split, window
> # Create DataSet representing the stream of input lines from kafka
> lines = spark\
>     .readStream\
>     .format("kafka")\
>     .option("kafka.bootstrap.servers", "localhost:9092")\
>     .option("subscribe", "words")\
>     .load()\
>     .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)")
> # Split the lines into words, retaining timestamps
> # split() splits each line into an array, and explode() turns the array into 
> multiple rows
> words = lines.select(
>     explode(split(lines.value, ' ')).alias('word'),
>     lines.timestamp
> )
> # Group the data by window and word and compute the count of each group
> windowedCounts = words.withWatermark("timestamp", "30 seconds").groupBy(
>     window(words.timestamp, "30 seconds", "30 seconds"), words.word
>     ).count()
> # Start running the query that prints the running counts to the console
> query = windowedCounts\
>     .writeStream\
>     .outputMode('append')\
>     .format('console')\
>     .option("truncate", "false")\
>     .start()
> query.awaitTermination()
> --------------------------------------------------------------------------------------
> Note that the Scala version of the same example in Zeppelin notebook works 
> fine:
> ----------------------------------------------------------------------------------------
> import java.sql.Timestamp
> import org.apache.spark.sql.streaming.ProcessingTime
> import org.apache.spark.sql.functions._
> // Create DataSet representing the stream of input lines from kafka
> val lines = spark
>             .readStream
>             .format("kafka")
>             .option("kafka.bootstrap.servers", "localhost:9092")
>             .option("subscribe", "words")
>             .load()
> // Split the lines into words, retaining timestamps
> val words = lines
>             .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS 
> TIMESTAMP)")
>             .as[(String, Timestamp)]
>             .flatMap(line => line._1.split(" ").map(word => (word, line._2)))
>             .toDF("word", "timestamp")
> // Group the data by window and word and compute the count of each group
> val windowedCounts = words
>             .withWatermark("timestamp", "30 seconds")
>             .groupBy(window($"timestamp", "30 seconds", "30 seconds"), 
> $"word")
>             .count()
> // Start running the query that prints the windowed word counts to the console
> val query = windowedCounts.writeStream
>             .outputMode("append")
>             .format("console")
>             .trigger(ProcessingTime("35 seconds"))
>             .option("truncate", "false")
>             .start()
> query.awaitTermination()
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to