Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread Mich Talebzadeh
interesting. So below should be the corrected code with the suggestion in the [SPARK-47718] .sql() does not recognize watermark defined upstream - ASF JIRA (apache.org) # Define schema for parsing Kafka messages schema = StructType([

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-09 Thread 刘唯
Sorry this is not a bug but essentially a user error. Spark throws a really confusing error and I'm also confused. Please see the reply in the ticket for how to make things correct. https://issues.apache.org/jira/browse/SPARK-47718 刘唯 于2024年4月6日周六 11:41写道: > This indeed looks like a bug. I will

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-06 Thread 刘唯
This indeed looks like a bug. I will take some time to look into it. Mich Talebzadeh 于2024年4月3日周三 01:55写道: > > hm. you are getting below > > AnalysisException: Append output mode not supported when there are > streaming aggregations on streaming DataFrames/DataSets without watermark; > > The

Re: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Mich Talebzadeh
hm. you are getting below AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark; The problem seems to be that you are using the append output mode when writing the streaming query results to Kafka. This mode

RE: Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Chloe He
Hi Mich, Thank you so much for your response. I really appreciate your help! You mentioned "defining the watermark using the withWatermark function on the streaming_df before creating the temporary view” - I believe this is what I’m doing and it’s not working for me. Here is the exact code

Re: [Spark SQL] How can I use .sql() in conjunction with watermarks?

2024-04-02 Thread Mich Talebzadeh
ok let us take it for a test. The original code of mine def fetch_data(self): self.sc.setLogLevel("ERROR") schema = StructType() \ .add("rowkey", StringType()) \ .add("timestamp", TimestampType()) \ .add("temperature", IntegerType())