[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2022-01-28 Thread krishna (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17483838#comment-17483838
 ] 

krishna commented on SPARK-24156:
-

Hi [~kcsrms] [~tdas] ,

  I am having the same issue. Is this issue resovled? is there a specific 
version I need to choose?

 
  I am struggling with a unique issue. I am not sure if my understanding is 
wrong or this is a bug with spark.
 
 #  I am reading a stream from events hub ( Extract)
 #  Pivoting and Aggregating the above dataframe ( Transformation). This is a 
WATERMARKED aggregation.
 #  writing the aggregation to Delta table in APPEND  mode with a Trigger . 

However, the most recently published message to event hub is not writing to 
delta even after falling out of the watermark time. 
 
 My understanding is the data should be inserted to the Delta table after 
Eventtime+Watermark.
 
 

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2021-10-12 Thread Kanishka Chauhan (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17427584#comment-17427584
 ] 

Kanishka Chauhan commented on SPARK-24156:
--

Hi [~tdas],

We observed on Spark 2.4.0 and Spark 3.0.3 that the Last data ( or window) will 
get evicted/flushed to Sink only if it falls below the watermark timestamp, 
which is clearly mentioned in Spark documentation especially with "append" 
output mode.



We are facing a similar issue as mentioned by [~taransaini43], that the last 
group of data not getting flushed to sink.

Is there a way we can force Spark to flush the last group of data after some 
pre-configured amount of time, in case no new data arrives in Spark for long?

I understand that Streaming inherently means an unbounded continuous stream of 
data with no end to it. But Spark users always wanted to see the complete data 
which they pushed to the source.

 

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2021-08-27 Thread Taran Saini (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405606#comment-17405606
 ] 

Taran Saini commented on SPARK-24156:
-

[~kabhwan] We have a Kafka broker from where we are continuously reading via 
spark structured stream on which we perform some aggregations before writing it 
out to a file sink in APPEND mode(s3).

Watermarking is used here to accommodate for late events (15 minutes/tried with 
lesser values as well)
{code:java}
.withWatermark("localTimeStamp", config.getString("spark.watermarkInterval"))
{code}

The groupBy clause is used to define batch and sliding interval(15 minutes in 
our case) 
{code:java}
.groupBy(window($"localTimeStamp", batchInterval, 
config.getString("spark.slideInterval")),..,..)
{code}

Post aggregation(s), here's the snippet to write stream results to file sink : 
{code:java}
.repartition(1)
.writeStream
.partitionBy("date", "hour", "windowMinute")
.format("parquet")
.option("checkpointLocation", config.getString("spark.checkpointLocation"))
.trigger(Trigger.ProcessingTime(config.getString("spark.triggerInterval")))
.outputMode("append")
.option("path", s"${s3SinkLocation}/parquet/")
.start()
{code}


Here are the issues which we observe : 
1. The stream doesn't write output to sink unless there is new data so 
basically, if no events are being fired in current window, the previous one 
doesn't get flushed out
2. Even with continuous inflow of events, there is no consistency in 
partitioned output directories getting created every window i.e 15 mins, it 
works many times but not always. We did try setting 
`.option("parquet.block.size", 1024)` thinking it might flush events every 
window if the size if greater than 1024 bytes but that is also not producing 
desired results.

To summarise, `watermarking + append mode + file sink` is not working as 
expected(as it should per spark documentation). We are using Spark 3.0.x

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2021-08-26 Thread Jungtaek Lim (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405603#comment-17405603
 ] 

Jungtaek Lim commented on SPARK-24156:
--

[~taransaini43]
Could you please elaborate the problem you are facing and expected behavior? We 
need the context to understand the problem.
And please make sure you are using recent version of Apache Spark, like 3.0.x 
at least.

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2021-08-26 Thread Taran Saini (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17405599#comment-17405599
 ] 

Taran Saini commented on SPARK-24156:
-

[~thebluephantom] did you see any resolution for this? We are also facing the 
same problem and see non uniform delays in writing to s3/file sink while using 
both watermarking and append mode(can see lots of people raising the same). 
This is a major bug which should be re-tested and mentioned in documentation at 
least. 
[~tdas] [~cloud_fan]

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2020-07-19 Thread Gerard Alexander (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17160831#comment-17160831
 ] 

Gerard Alexander commented on SPARK-24156:
--

Please see 
[https://stackoverflow.com/questions/62915922/spark-structured-streaming-wont-pull-the-final-batch-from-kafka]
  has the issue crept back in possibly?

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
> Fix For: 2.4.0
>
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2018-09-10 Thread Wenchen Fan (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16609241#comment-16609241
 ] 

Wenchen Fan commented on SPARK-24156:
-

I'm resolving it, since all subtasks are resolved.

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-24156) Enable no-data micro batches for more eager streaming state clean up

2018-08-13 Thread Xiao Li (JIRA)


[ 
https://issues.apache.org/jira/browse/SPARK-24156?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16578937#comment-16578937
 ] 

Xiao Li commented on SPARK-24156:
-

[~tdas] Can we mark it done?

> Enable no-data micro batches for more eager streaming state clean up 
> -
>
> Key: SPARK-24156
> URL: https://issues.apache.org/jira/browse/SPARK-24156
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 2.3.0
>Reporter: Tathagata Das
>Assignee: Tathagata Das
>Priority: Major
>
> Currently, MicroBatchExecution in Structured Streaming runs batches only when 
> there is new data to process. This is sensible in most cases as we dont want 
> to unnecessarily use resources when there is nothing new to process. However, 
> in some cases of stateful streaming queries, this delays state clean up as 
> well as clean-up based output. For example, consider a streaming aggregation 
> query with watermark-based state cleanup. The watermark is updated after 
> every batch with new data completes. The updated value is used in the next 
> batch to clean up state, and output finalized aggregates in append mode. 
> However, if there is no data, then the next batch does not occur, and 
> cleanup/output gets delayed unnecessarily. This is true for all stateful 
> streaming operators - aggregation, deduplication, joins, mapGroupsWithState
> This issue tracks the work to enable no-data batches in MicroBatchExecution. 
> The major challenge is that all the tests of relevant stateful operations add 
> dummy data to force another batch for testing the state cleanup. So a lot of 
> the tests are going to be changed. So my plan is to enable no-data batches 
> for different stateful operators one at a time.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org