[ https://issues.apache.org/jira/browse/SPARK-47718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wei Liu updated SPARK-47718: ---------------------------- Labels: (was: pull-request-available) > .sql() does not recognize watermark defined upstream > ---------------------------------------------------- > > Key: SPARK-47718 > URL: https://issues.apache.org/jira/browse/SPARK-47718 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 3.5.1 > Reporter: Chloe He > Priority: Major > > I have a data pipeline set up in such a way that it reads data from a Kafka > source, does some transformation on the data using pyspark, then writes the > output into a sink (Kafka, Redis, etc). > > My entire pipeline in written in SQL, so I wish to use the .sql() method to > execute SQL on my streaming source directly. > > However, I'm running into the issue where my watermark is not being > recognized by the downstream query via the .sql() method. > > ``` > Python 3.11.8 | packaged by conda-forge | (main, Feb 16 2024, 20:49:36) > [Clang 16.0.6 ] on darwin > Type "help", "copyright", "credits" or "license" for more information. > >>> import pyspark > >>> print(pyspark.__version__) > 3.5.1 > >>> from pyspark.sql import SparkSession > >>> > >>> session = SparkSession.builder \ > ... .config("spark.jars.packages", > "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1")\ > ... .getOrCreate() > >>> from pyspark.sql.functions import col, from_json > >>> from pyspark.sql.types import StructField, StructType, TimestampType, > >>> LongType, DoubleType, IntegerType > >>> schema = StructType( > ... [ > ... StructField('createTime', TimestampType(), True), > ... StructField('orderId', LongType(), True), > ... StructField('payAmount', DoubleType(), True), > ... StructField('payPlatform', IntegerType(), True), > ... StructField('provinceId', IntegerType(), True), > ... ]) > >>> > >>> streaming_df = session.readStream\ > ... .format("kafka")\ > ... .option("kafka.bootstrap.servers", "localhost:9092")\ > ... .option("subscribe", "payment_msg")\ > ... .option("startingOffsets","earliest")\ > ... .load()\ > ... .select(from_json(col("value").cast("string"), > schema).alias("parsed_value"))\ > ... .select("parsed_value.*")\ > ... .withWatermark("createTime", "10 seconds") > >>> > >>> streaming_df.createOrReplaceTempView("streaming_df") > >>> session.sql(""" > ... SELECT > ... window.start, window.end, provinceId, sum(payAmount) as totalPayAmount > ... FROM streaming_df > ... GROUP BY provinceId, window('createTime', '1 hour', '30 minutes') > ... ORDER BY window.start > ... """)\ > ... .writeStream\ > ... .format("kafka") \ > ... .option("checkpointLocation", "checkpoint") \ > ... .option("kafka.bootstrap.servers", "localhost:9092") \ > ... .option("topic", "sink") \ > ... .start() > ``` > > This throws exception > ``` > pyspark.errors.exceptions.captured.AnalysisException: Append output mode not > supported when there are streaming aggregations on streaming > DataFrames/DataSets without watermark; line 6 pos 4; > ``` > > -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org