[ 
https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17835175#comment-17835175
 ] 

Jungtaek Lim commented on SPARK-47718:
--------------------------------------

I've lowered down to major - this is neither a regression nor correctness issue.

> .sql() does not recognize watermark defined upstream
> ----------------------------------------------------
>
>                 Key: SPARK-47718
>                 URL: https://issues.apache.org/jira/browse/SPARK-47718
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 3.5.1
>            Reporter: Chloe He
>            Priority: Major
>              Labels: pull-request-available
>
> I have a data pipeline set up in such a way that it reads data from a Kafka 
> source, does some transformation on the data using pyspark, then writes the 
> output into a sink (Kafka, Redis, etc).
>  
> My entire pipeline in written in SQL, so I wish to use the .sql() method to 
> execute SQL on my streaming source directly.
>  
> However, I'm running into the issue where my watermark is not being 
> recognized by the downstream query via the .sql() method.
>  
> ```
> Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) 
> [Clang 16.0.6 ] on darwin
> Type "help", "copyright", "credits" or "license" for more information.
> >>> import pyspark
> >>> print(pyspark.__version__)
> 3.5.1
> >>> from pyspark.sql import SparkSession
> >>>
> >>> session = SparkSession.builder \
> ...     .config("spark.jars.packages", 
> "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\
> ...     .getOrCreate()
> >>> from pyspark.sql.functions import col, from_json
> >>> from pyspark.sql.types import StructField, StructType, TimestampType, 
> >>> LongType, DoubleType, IntegerType
> >>> schema = StructType(
> ...     [
> ...         StructField('createTime', TimestampType(), True),
> ...         StructField('orderId', LongType(), True),
> ...         StructField('payAmount', DoubleType(), True),
> ...         StructField('payPlatform', IntegerType(), True),
> ...         StructField('provinceId', IntegerType(), True),
> ...     ])
> >>>
> >>> streaming_df = session.readStream\
> ...     .format("kafka")\
> ...     .option("kafka.bootstrap.servers", "localhost:9092")\
> ...     .option("subscribe", "payment_msg")\
> ...     .option("startingOffsets","earliest")\
> ...     .load()\
> ...     .select(from_json(col("value").cast("string"), 
> schema).alias("parsed_value"))\
> ...     .select("parsed_value.*")\
> ...     .withWatermark("createTime", "10 seconds")
> >>>
> >>> streaming_df.createOrReplaceTempView("streaming_df")
> >>> session.sql("""
> ... SELECT
> ...     window.start, window.end, provinceId, sum(payAmount) as totalPayAmount
> ...     FROM streaming_df
> ...     GROUP BY provinceId, window('createTime', '1 hour', '30 minutes')
> ...     ORDER BY window.start
> ... """)\
> ...   .writeStream\
> ...   .format("kafka") \
> ...   .option("checkpointLocation", "checkpoint") \
> ...   .option("kafka.bootstrap.servers", "localhost:9092") \
> ...   .option("topic", "sink") \
> ...   .start()
> ```
>  
> This throws exception
> ```
> pyspark.errors.exceptions.captured.AnalysisException: Append output mode not 
> supported when there are streaming aggregations on streaming 
> DataFrames/DataSets without watermark; line 6 pos 4;
> ```
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to