[jira] [Commented] (SPARK-38078) Aggregation with Watermark in AppendMode is holding data beyong water mark boundary.

2023-04-01 Thread Vindhya G (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38078?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707618#comment-17707618
 ] 

Vindhya G commented on SPARK-38078:
---

https://issues.apache.org/jira/browse/SPARK-43001
similar bug

> Aggregation with Watermark in AppendMode is holding data beyong water mark 
> boundary.
> 
>
> Key: SPARK-38078
> URL: https://issues.apache.org/jira/browse/SPARK-38078
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.2.0
>Reporter: krishna
>Priority: Major
>
>  I am struggling with a unique issue. I am not sure if my understanding is 
> wrong or this is a bug with spark.
>  
>  #  I am reading a stream from events hub/kafka ( Extract)
>  #  Pivoting and Aggregating the above dataframe ( Transformation). This is a 
> WATERMARKED aggregation.
>  #  writing the aggregation to Console/Delta table in APPEND  mode with a 
> Trigger . 
> However, the most recently published message to event hub is not writing to 
> console/delta even after falling out of the watermark time. 
>  
>  My understanding is the event should be inserted to  the Delta table after 
> Eventtime+Watermark.
>  
> Moreover, all the events in the memory stored must be flushed out to the sink 
> irrespective of the watermark before stopping to mark a graceful shutdown .
>  
> Please advise.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread Vindhya G (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707617#comment-17707617
 ] 

Vindhya G commented on SPARK-43001:
---

These two seem to be similar issue
https://issues.apache.org/jira/browse/SPARK-38078.

> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 5 
> hours  later and the client will get the message after 5 hours instead of the 
> 10 seconds delay of window.
> !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!
> The current implementation needs to be improved. Include in spark internal 
> mechanisms to close windows automatically.
>  
> *What we propose:*
> Add third parameter 
> {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
> *maxDelayClose*{_}). And then trigger will execute 
> {code:java}
> if(now - window.upper_bound > maxDelayClose){
>      window.close().flush();
> }
> {code}
> I assume it can be done in a day. It wasn't expected for us that our 
> customers couldn't get the notifications. (the company is in the medical 
> field).
>  
> simple code for problem:
> {code:java}
> kafka_stream_df = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
>     .option("subscribe", KAFKA_TOPIC) \
>     .option("includeHeaders", "true") \
>     .load()
> sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
> STRING)")
>        .select(from_json(col("value").cast("string"), 
> json_schema).alias("data"))
>        .select("data.*")
>        .withWatermark("dt", "1 seconds")
>        .groupBy(window("dt", "10 seconds"))
>        .agg(sum("price"))
>       )
>  
> console = sel \
>     .writeStream \
>     .trigger(processingTime='10 seconds') \
>     .format("console") \
>     .outputMode("append")\
>     .start()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42774) Expose VectorTypes API for DataSourceV2 Batch Scans

2023-04-01 Thread xiaochen zhou (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707614#comment-17707614
 ] 

xiaochen zhou commented on SPARK-42774:
---

I would like to give a try on this, can I take this tickets?

> Expose VectorTypes API for DataSourceV2 Batch Scans
> ---
>
> Key: SPARK-42774
> URL: https://issues.apache.org/jira/browse/SPARK-42774
> Project: Spark
>  Issue Type: Improvement
>  Components: SQL
>Affects Versions: 3.3.2
>Reporter: Micah Kornfield
>Priority: Minor
>
> SparkPlan's vectorType's attribute can be used to [specialize 
> codegen|https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L151]
>  however 
> [BatchScanExecBase|https://github.com/apache/spark/blob/6b6bb6fa20f40aeedea2fb87008e9cce76c54e28/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2ScanExecBase.scala]
>  does not override this so we DSv2 sources do not get any benefit of concrete 
> class dispatch.
> This proposes adding an override to BatchScanExecBase which delegates to a 
> new default method on 
> [PartitionReaderFactory|https://github.com/apache/spark/blob/f1d42bb68d6d69d9a32f91a390270f9ec33c3207/sql/catalyst/src/main/java/org/apache/spark/sql/connector/read/PartitionReaderFactory.java]
>  to expose vectoryTypes:
> {{
> default Optional> getVectorTypes()
> { return Optional.empty(); } }}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38478) Use error classes in org.apache.spark.ui

2023-04-01 Thread Bo Zhang (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707610#comment-17707610
 ] 

Bo Zhang commented on SPARK-38478:
--

Thanks! [~Wencong Liu] please feel free to submit a PR for this.

> Use error classes in org.apache.spark.ui
> 
>
> Key: SPARK-38478
> URL: https://issues.apache.org/jira/browse/SPARK-38478
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bo Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Comment Edited] (SPARK-42860) Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode

2023-04-01 Thread xiaochen zhou (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42860?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707402#comment-17707402
 ] 

xiaochen zhou edited comment on SPARK-42860 at 4/2/23 3:03 AM:
---

[https://github.com/apache/spark/pull/40635|https://github.com/apache/spark/pull/40631]


was (Author: zxcoccer):
[https://github.com/apache/spark/pull/40631]

> Add analysed logical mode in org.apache.spark.sql.execution.ExplainMode
> ---
>
> Key: SPARK-42860
> URL: https://issues.apache.org/jira/browse/SPARK-42860
> Project: Spark
>  Issue Type: New Feature
>  Components: SQL
>Affects Versions: 3.2.0
>Reporter: arindam patra
>Priority: Blocker
>
> We have a service that submits spark sql jobs to a spark cluster .
> we want to validate the sql query before submitting the job . We are 
> currently using df.explain(extended=true) which generates parsed , analysed , 
> optimised logical plan and physical plan . 
> But generating  optimised logical plan  sometimes takes more time for e.g if 
> you have applied a filter on a partitioned column , spark will list all 
> directories and take the required ones . 
> For our query validation purpose this doesnt make sense and it would be great 
> if there is a explain mode that will only print the parsed and analysed 
> logical plans only



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] (SPARK-42840) Assign a name to the error class _LEGACY_ERROR_TEMP_2004

2023-04-01 Thread Leibniz Hu (Jira)


[ https://issues.apache.org/jira/browse/SPARK-42840 ]


Leibniz Hu deleted comment on SPARK-42840:


was (Author: JIRAUSER299406):
[~maxgekk]  https://github.com/apache/spark/pull/40634

> Assign a name to the error class _LEGACY_ERROR_TEMP_2004
> 
>
> Key: SPARK-42840
> URL: https://issues.apache.org/jira/browse/SPARK-42840
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Choose a proper name for the error class *_LEGACY_ERROR_TEMP_2004* defined in 
> {*}core/src/main/resources/error/error-classes.json{*}. The name should be 
> short but complete (look at the example in error-classes.json).
> Add a test which triggers the error from user code if such test still doesn't 
> exist. Check exception fields by using {*}checkError(){*}. The last function 
> checks valuable error fields only, and avoids dependencies from error text 
> message. In this way, tech editors can modify error format in 
> error-classes.json, and don't worry of Spark's internal tests. Migrate other 
> tests that might trigger the error onto checkError().
> If you cannot reproduce the error from user space (using SQL query), replace 
> the error by an internal error, see {*}SparkException.internalError(){*}.
> Improve the error message format in error-classes.json if the current is not 
> clear. Propose a solution to users how to avoid and fix such kind of errors.
> Please, look at the PR below as examples:
>  * [https://github.com/apache/spark/pull/38685]
>  * [https://github.com/apache/spark/pull/38656]
>  * [https://github.com/apache/spark/pull/38490]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42840) Assign a name to the error class _LEGACY_ERROR_TEMP_2004

2023-04-01 Thread Leibniz Hu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707596#comment-17707596
 ] 

Leibniz Hu commented on SPARK-42840:


[~maxgekk]  https://github.com/apache/spark/pull/40634

> Assign a name to the error class _LEGACY_ERROR_TEMP_2004
> 
>
> Key: SPARK-42840
> URL: https://issues.apache.org/jira/browse/SPARK-42840
> Project: Spark
>  Issue Type: Sub-task
>  Components: SQL
>Affects Versions: 3.5.0
>Reporter: Max Gekk
>Priority: Minor
>  Labels: starter
>
> Choose a proper name for the error class *_LEGACY_ERROR_TEMP_2004* defined in 
> {*}core/src/main/resources/error/error-classes.json{*}. The name should be 
> short but complete (look at the example in error-classes.json).
> Add a test which triggers the error from user code if such test still doesn't 
> exist. Check exception fields by using {*}checkError(){*}. The last function 
> checks valuable error fields only, and avoids dependencies from error text 
> message. In this way, tech editors can modify error format in 
> error-classes.json, and don't worry of Spark's internal tests. Migrate other 
> tests that might trigger the error onto checkError().
> If you cannot reproduce the error from user space (using SQL query), replace 
> the error by an internal error, see {*}SparkException.internalError(){*}.
> Improve the error message format in error-classes.json if the current is not 
> clear. Propose a solution to users how to avoid and fix such kind of errors.
> Please, look at the PR below as examples:
>  * [https://github.com/apache/spark/pull/38685]
>  * [https://github.com/apache/spark/pull/38656]
>  * [https://github.com/apache/spark/pull/38490]



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41628) Support async query execution

2023-04-01 Thread Martin Grund (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707580#comment-17707580
 ] 

Martin Grund commented on SPARK-41628:
--

I think this needs a bit more discussion. Generally, I think it would be 
possible to model the asynchronous execution using the existing model. A couple 
of things would be needed up front:

1. `ExecutePlanRequest` needs an execution mode to indicate if the client wants 
a blocking request or non-blocking, this should probably be an enum
2. `ExecutePlanResponse` needs to indicate the query ID to be able to resume 
the query to block and fetch results
3. We need a way to check the status, but this could probably be modeled as a 
`Command` (e.g. QueryStatusCommand)

Once this API is specced out, the next step is to identify how to perform the 
query execution in the background so that the results can be fetched when 
available.

My suggestion would be to prepare a small doc on what exactly you're doing so 
that we can have a discussion. Feel free to do the design for this in a README 
in a pull request if this is preferred.

> Support async query execution
> -
>
> Key: SPARK-41628
> URL: https://issues.apache.org/jira/browse/SPARK-41628
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Martin Grund
>Priority: Major
>
> Today the query execution is completely synchronous, add an additional 
> asynchronous API that allows to submit and polll for the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 
{code:java}
if(now - window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 
{code:java}
if(now - window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Attachment: (was: image-2023-04-01-10-57-28-866.png)

> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 5 
> hours  later and the client will get the message after 5 hours instead of the 
> 10 seconds delay of window.
> !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!
> The current implementation needs to be improved. Include in spark internal 
> mechanisms to close windows automatically.
>  
> *What we propose:*
> Add third parameter 
> {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
> *maxDelayClose*{_}). And then trigger will execute 
> {code:java}
> if(now - window.upper_bound > maxDelayClose){
>      window.close().flush();
> }
> {code}
> I assume it can be done in a day. It wasn't expected for us that our 
> customers couldn't get the notifications. (the company is in the medical 
> field).
> !image-2023-04-01-10-57-28-866.png|width=109,height=91!
>  
> simple code for problem:
> {code:java}
> kafka_stream_df = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
>     .option("subscribe", KAFKA_TOPIC) \
>     .option("includeHeaders", "true") \
>     .load()
> sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
> STRING)")
>        .select(from_json(col("value").cast("string"), 
> json_schema).alias("data"))
>        .select("data.*")
>        .withWatermark("dt", "1 seconds")
>        .groupBy(window("dt", "10 seconds"))
>        .agg(sum("price"))
>       )
>  
> console = sel \
>     .writeStream \
>     .trigger(processingTime='10 seconds') \
>     .format("console") \
>     .outputMode("append")\
>     .start()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-42519) Add more WriteTo tests after Scala Client session config is supported

2023-04-01 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-42519?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707519#comment-17707519
 ] 

ASF GitHub Bot commented on SPARK-42519:


User 'Hisoka-X' has created a pull request for this issue:
https://github.com/apache/spark/pull/40564

> Add more WriteTo tests after Scala Client session config is supported
> -
>
> Key: SPARK-42519
> URL: https://issues.apache.org/jira/browse/SPARK-42519
> Project: Spark
>  Issue Type: Improvement
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Zhen Li
>Priority: Major
>
> Add more test cases following the examples in 
> "SparkConnectProtoSuite("WriteTo")" tests and add more tests.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-38478) Use error classes in org.apache.spark.ui

2023-04-01 Thread Wencong Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-38478?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707516#comment-17707516
 ] 

Wencong Liu commented on SPARK-38478:
-

I'd like to take this ticket, would you like to assign it to me? [~bozhang] 

> Use error classes in org.apache.spark.ui
> 
>
> Key: SPARK-38478
> URL: https://issues.apache.org/jira/browse/SPARK-38478
> Project: Spark
>  Issue Type: Sub-task
>  Components: Spark Core
>Affects Versions: 3.3.0
>Reporter: Bo Zhang
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 
{code:java}
if(now - window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now - window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
> Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now - window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
> Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
> Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()

sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
> Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-57-28-866.png|width=109,height=91!

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
> Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Attachment: image-2023-04-01-10-57-28-866.png

> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
> Attachments: image-2023-04-01-10-57-28-866.png
>
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and new messages stop incoming and next message come in 5 
> hours  later and the client will get the message after 5 hours instead of the 
> 10 seconds delay of window.
> !https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!
> The current implementation needs to be improved. Include in spark internal 
> mechanisms to close windows automatically.
>  
> *What we propose:*
> Add third parameter 
> {{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
> *maxDelayClose*{_}). And then trigger will execute 
>  
> {code:java}
> if(now-window.upper_bound > maxDelayClose){
>      window.close().flush();
> }
> {code}
>  
> I assume it can be done in a day. It wasn't expected for us that our 
> customers couldn't get the notifications. (the company is in the medical 
> field).
> !image-2023-04-01-10-38-43-326.png|width=159,height=101!
>  
>  
> simple code for problem:
>  
> {code:java}
> kafka_stream_df = spark \
>     .readStream \
>     .format("kafka") \
>     .option("kafka.bootstrap.servers", KAFKA_BROKER) \
>     .option("subscribe", KAFKA_TOPIC) \
>     .option("includeHeaders", "true") \
>     .load()
> sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
> STRING)")
>        .select(from_json(col("value").cast("string"), 
> json_schema).alias("data"))
>        .select("data.*")
>        .withWatermark("dt", "1 seconds")
>        .groupBy(window("dt", "10 seconds"))
>        .agg(sum("price"))
>       )
>  
> console = sel \
>     .writeStream \
>     .trigger(processingTime='10 seconds') \
>     .format("console") \
>     .outputMode("append")\
>     .start()
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed +only when the next message arrives+ 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed +only when the next message arrives+ 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=707,height=294!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=971,height=404!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed only when the next message arrives 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> 

[jira] [Updated] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-43001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

padavan updated SPARK-43001:

Description: 
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png|width=971,height=404!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 

  was:
The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 


> Spark last window dont flush in append mode
> ---
>
> Key: SPARK-43001
> URL: https://issues.apache.org/jira/browse/SPARK-43001
> Project: Spark
>  Issue Type: Bug
>  Components: PySpark, Spark Core
>Affects Versions: 3.3.2
>Reporter: padavan
>Priority: Critical
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> The problem is very simple, when you use *TUMBLING* *window* with {*}append 
> mode{*}, then the window is closed only when the next message arrives 
> ({_}+watermark logic{_}). 
> In the current implementation, if you *stop* *incoming* streaming data, the 
> *last* window will *NEVER close* and we LOSE the last window data.
>  
> Business situation:
> Worked correctly and 

[jira] [Created] (SPARK-43001) Spark last window dont flush in append mode

2023-04-01 Thread padavan (Jira)
padavan created SPARK-43001:
---

 Summary: Spark last window dont flush in append mode
 Key: SPARK-43001
 URL: https://issues.apache.org/jira/browse/SPARK-43001
 Project: Spark
  Issue Type: Bug
  Components: PySpark, Spark Core
Affects Versions: 3.3.2
Reporter: padavan


The problem is very simple, when you use *TUMBLING* *window* with {*}append 
mode{*}, then the window is closed only when the next message arrives 
({_}+watermark logic{_}). 

In the current implementation, if you *stop* *incoming* streaming data, the 
*last* window will *NEVER close* and we LOSE the last window data.

 

Business situation:

Worked correctly and new messages stop incoming and next message come in 5 
hours  later and the client will get the message after 5 hours instead of the 
10 seconds delay of window.

!https://user-images.githubusercontent.com/61819835/226478055-dc4a123c-4397-4eb0-b6ed-1e185b6fab76.png!

The current implementation needs to be improved. Include in spark internal 
mechanisms to close windows automatically.

 

*What we propose:*

Add third parameter 

{{{}DataFrame.{}}}{{{}withWatermark{}}}({_}eventTime{_}, {_}delayThreshold, 
*maxDelayClose*{_}). And then trigger will execute 

 
{code:java}
if(now-window.upper_bound > maxDelayClose){
     window.close().flush();
}
{code}
 

I assume it can be done in a day. It wasn't expected for us that our customers 
couldn't get the notifications. (the company is in the medical field).

!image-2023-04-01-10-38-43-326.png|width=159,height=101!

 

 

simple code for problem:

 
{code:java}
kafka_stream_df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BROKER) \
    .option("subscribe", KAFKA_TOPIC) \
    .option("includeHeaders", "true") \
    .load()
sel = (kafka_stream_df.selectExpr("CAST(key AS STRING)", "CAST(value AS 
STRING)")
       .select(from_json(col("value").cast("string"), 
json_schema).alias("data"))
       .select("data.*")
       .withWatermark("dt", "1 seconds")
       .groupBy(window("dt", "10 seconds"))
       .agg(sum("price"))
      )
 
console = sel \
    .writeStream \
    .trigger(processingTime='10 seconds') \
    .format("console") \
    .outputMode("append")\
    .start()
{code}
 

 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-41628) Support async query execution

2023-04-01 Thread Jia Fan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-41628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17707513#comment-17707513
 ] 

Jia Fan commented on SPARK-41628:
-

Hi, [~grundprinzip-db] . I have a question, Should I use grpc futureStub to 
implement asynchronous API? It just should add some connect client api. Or I 
should add new grpc method named `AsyncExecutePlanRequest` and return 
requestId. Then User can use another grpc method named `WaitResponse` to get 
`ExecutePlanResponse`?

> Support async query execution
> -
>
> Key: SPARK-41628
> URL: https://issues.apache.org/jira/browse/SPARK-41628
> Project: Spark
>  Issue Type: Sub-task
>  Components: Connect
>Affects Versions: 3.4.0
>Reporter: Martin Grund
>Priority: Major
>
> Today the query execution is completely synchronous, add an additional 
> asynchronous API that allows to submit and polll for the result.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-43000) Do not cast to double type if one side is AnsiIntervalType in BinaryArithmetic

2023-04-01 Thread Yuming Wang (Jira)
Yuming Wang created SPARK-43000:
---

 Summary: Do not cast to double type if one side is 
AnsiIntervalType in BinaryArithmetic
 Key: SPARK-43000
 URL: https://issues.apache.org/jira/browse/SPARK-43000
 Project: Spark
  Issue Type: Improvement
  Components: SQL
Affects Versions: 3.5.0
Reporter: Yuming Wang






--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org