[jira] [Created] (SPARK-48511) [Arbitrary State Support] Remove TimeMode None

2024-06-03 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-48511:


 Summary: [Arbitrary State Support] Remove TimeMode None
 Key: SPARK-48511
 URL: https://issues.apache.org/jira/browse/SPARK-48511
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Bhuwan Sahni


A structured streaming query works in either Processing time mode, or event 
Time mode depending on whether eventTime has been specified, the presence of 
TImeMode.None is confusing. This Jira targets removing TimeMode None from the 
experimental API. 

Note that if eventTimeColumn is specified for output dataset in 
transformWithState operator. TimeMode defaults to EventTime.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48199) [Arbitrary State Support] Allow users to set EventTime timestamp properly in TransformWithState for watermark evaluation

2024-05-08 Thread Bhuwan Sahni (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48199?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhuwan Sahni updated SPARK-48199:
-
Epic Link: SPARK-46815

> [Arbitrary State Support] Allow users to set EventTime timestamp properly in 
> TransformWithState for watermark evaluation
> 
>
> Key: SPARK-48199
> URL: https://issues.apache.org/jira/browse/SPARK-48199
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Priority: Major
>
> TransformWithState operator allows users to provide a eventTimeColumn. This 
> column is used for watermark calculation in any operation done after 
> transformWithState.
> Spark currently validates that the rows emitted from transformWithState 
> operator adhere to the watermark boundary (specifically, 
> watermarkForLateEvents). However, the user currently does not have a 
> mechanism to obtain the watermarkForLateEvents.
> We need to either 1) introduce a method to users which gives a watermark 
> value before advancing (late events) or 2) provide a mechanism for users to 
> set the event time timestamp properly without watermark value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48199) [Arbitrary State Support] Allow users to set EventTime timestamp properly in TransformWithState for watermark evaluation

2024-05-08 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-48199:


 Summary: [Arbitrary State Support] Allow users to set EventTime 
timestamp properly in TransformWithState for watermark evaluation
 Key: SPARK-48199
 URL: https://issues.apache.org/jira/browse/SPARK-48199
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Bhuwan Sahni


TransformWithState operator allows users to provide a eventTimeColumn. This 
column is used for watermark calculation in any operation done after 
transformWithState.

Spark currently validates that the rows emitted from transformWithState 
operator adhere to the watermark boundary (specifically, 
watermarkForLateEvents). However, the user currently does not have a mechanism 
to obtain the watermarkForLateEvents.

We need to either 1) introduce a method to users which gives a watermark value 
before advancing (late events) or 2) provide a mechanism for users to set the 
event time timestamp properly without watermark value.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47960) Support Chaining Stateful Operators in TransformWithState

2024-04-23 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-47960:


 Summary: Support Chaining Stateful Operators in TransformWithState
 Key: SPARK-47960
 URL: https://issues.apache.org/jira/browse/SPARK-47960
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Bhuwan Sahni
 Fix For: 4.0.0


This issue tracks adding support to chain stateful operators after the 
Arbitrary State API, transformWithState. In order to support chaining, we need 
to allow the user to specify the new eventTimeColumn in the output from 
StatefulProcessor. Any watermark evaluation expressions downstream after 
transformWithState would use the user specified eventTimeColumn.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-47840) Remove foldable propagation across Streaming Aggregate/Join nodes

2024-04-12 Thread Bhuwan Sahni (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhuwan Sahni updated SPARK-47840:
-
Description: 
Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
as child of HashAggregate, however any grouping key read from StateStore will 
still be read as ds1 due to the optimization. 

 

*Optimized Logical Plan*
{quote}=== Applying Rule 
org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===

=== Old Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource

=== New Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource


{quote}
*Corresponding Physical Plan*
{quote}WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
+- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
count#31L|#4, count#31L])
   +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0, 0, 2
      +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
         +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], 2
            +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
               +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, count#38L|#39, 
count#38L])
                  +- Project
                     +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
{quote}
 

  was:
Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the name#4 

[jira] [Updated] (SPARK-47840) Remove foldable propagation across Streaming Aggregate/Join nodes

2024-04-12 Thread Bhuwan Sahni (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-47840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bhuwan Sahni updated SPARK-47840:
-
Description: 
Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
as child of HashAggregate, however any grouping key read from StateStore will 
still be read as ds1 due to the optimization. 

 

### Optimized Logical Plan
{quote}
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation 
===

=== Old Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [name#4|#4], [name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource

=== New Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L|#4, count(1) AS 
count#31L]
   +- Project [ds1 AS name#4|#4]
      +- StreamingDataSourceV2ScanRelation[value#1|#1] MemoryStreamDataSource


{quote}




### Corresponding Physical Plan
{quote}
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
+- HashAggregate(keys=[ds1#39|#39], functions=[finalmerge_count(merge 
count#38L) AS count(1)#30L|#38L) AS count(1)#30L], output=[name#4, 
count#31L|#4, count#31L])
   +- StateStoreSave [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], Complete, 0, 0, 2
      +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
         +- StateStoreRestore [ds1#39|#39], state info [ checkpoint = 
[file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state|file:///tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state],
 runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions 
= 5], 2
            +- HashAggregate(keys=[ds1#39|#39], functions=[merge_count(merge 
count#38L) AS count#38L|#38L) AS count#38L], output=[ds1#39, count#38L|#39, 
count#38L])
               +- HashAggregate(keys=[ds1 AS ds1#39|#39], 
functions=[partial_count(1) AS count#38L|#38L], output=[ds1#39, count#38L|#39, 
count#38L])
                  +- Project
                     +- MicroBatchScan[value#1|#1] MemoryStreamDataSource
{quote}
 

  was:
Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the 

[jira] [Created] (SPARK-47840) Remove foldable propagation across Streaming Aggregate/Join nodes

2024-04-12 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-47840:


 Summary: Remove foldable propagation across Streaming 
Aggregate/Join nodes
 Key: SPARK-47840
 URL: https://issues.apache.org/jira/browse/SPARK-47840
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.5.1, 4.0.0
Reporter: Bhuwan Sahni


Streaming queries with Union of 2 data streams followed by an Aggregate 
(groupBy) can produce incorrect results if the grouping key is a constant 
literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the 
literal value in the grouping key as foldable and replaces the grouping key 
expression with the actual literal value. This optimization is correct for 
batch queries. However Streaming queries also read information from StateStore, 
and the output contains both the results from StateStore (computed in previous 
microbatches) and data from input sources (computed in this microbatch). The 
HashAggregate node after StateStore always reads grouping key value as the 
optimized literal (as the grouping key expression is optimized into a literal 
by the optimizer). This ends up replacing keys in StateStore with the literal 
value resulting in incorrect output. 

See an example logical and physical plan below for a query performing a union 
on 2 data streams, followed by a groupBy. Note that the name#4 expression has 
been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node 
as child of HashAggregate, however any grouping key read from StateStore will 
still be read as ds1 due to the optimization. 

### Optimized Logical Plan

```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation 
===

=== Old Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [name#4], [name#4, count(1) AS count#31L]
   +- Project [ds1 AS name#4]
      +- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource


=== New Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, 
Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L]
   +- Project [ds1 AS name#4]
      +- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource



```


### Corresponding Physical Plan

```
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: 
org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite@2b4c6242],
 
org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/1859075634@35709d26
+- HashAggregate(keys=[ds1#39], functions=[finalmerge_count(merge count#38L) AS 
count(1)#30L], output=[name#4, count#31L])
   +- StateStoreSave [ds1#39], state info [ checkpoint = 
file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state, runId 
= 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions = 5], 
Complete, 0, 0, 2
      +- HashAggregate(keys=[ds1#39], functions=[merge_count(merge count#38L) 
AS count#38L], output=[ds1#39, count#38L])
         +- StateStoreRestore [ds1#39], state info [ checkpoint = 
file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state, runId 
= 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions = 5], 2
            +- HashAggregate(keys=[ds1#39], functions=[merge_count(merge 
count#38L) AS count#38L], output=[ds1#39, count#38L])
               +- HashAggregate(keys=[ds1 AS ds1#39], 
functions=[partial_count(1) AS count#38L], output=[ds1#39, count#38L])
                  +- Project
                     +- MicroBatchScan[value#1] MemoryStreamDataSource

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47568) Fix race condition between maintenance thread and task thead for RocksDB snapshot

2024-03-26 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-47568:


 Summary: Fix race condition between maintenance thread and task 
thead for RocksDB snapshot
 Key: SPARK-47568
 URL: https://issues.apache.org/jira/browse/SPARK-47568
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.5.1, 3.5.0, 4.0.0, 3.5.2
Reporter: Bhuwan Sahni


There are currently some race conditions between maintenance thread and task 
thread which can result in corrupted checkpoint state.
 # The maintenance thread currently relies on class variable {{lastSnapshot}} 
to find the latest checkpoint and uploads it to DFS. This checkpoint can be 
modified at commit time by Task thread if a new snapshot is created.
 # The task thread does not reset lastSnapshot at load time, which can result 
in newer snapshots (if a old version is loaded) being considered valid and 
uploaded to DFS. This results in VersionIdMismatch errors.

This issue proposes to fix these issues by guarding latestSnapshot variable 
modification, and setting latestSnapshot properly at load time.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-47558) [Arbitrary State Support] State TTL support - ValueState

2024-03-26 Thread Bhuwan Sahni (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-47558?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17830977#comment-17830977
 ] 

Bhuwan Sahni commented on SPARK-47558:
--

https://github.com/apache/spark/pull/45674

> [Arbitrary State Support] State TTL support - ValueState
> 
>
> Key: SPARK-47558
> URL: https://issues.apache.org/jira/browse/SPARK-47558
> Project: Spark
>  Issue Type: Task
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Priority: Major
>
> Add support for expiring state value based on ttl for Value State in 
> transformWithState operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47558) [Arbitrary State Support] State TTL support - ValueState

2024-03-25 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-47558:


 Summary: [Arbitrary State Support] State TTL support - ValueState
 Key: SPARK-47558
 URL: https://issues.apache.org/jira/browse/SPARK-47558
 Project: Spark
  Issue Type: Task
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Bhuwan Sahni


Add support for expiring state value based on ttl for Value State in 
transformWithState operator.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-47036) RocksDB versionID Mismatch in SST files with Compaction

2024-02-13 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-47036:


 Summary: RocksDB versionID Mismatch in SST files with Compaction
 Key: SPARK-47036
 URL: https://issues.apache.org/jira/browse/SPARK-47036
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.5.0, 4.0.0, 3.5.1, 3.5.2
Reporter: Bhuwan Sahni


RocksDB compaction can result in version Id mismatch errors if the same version 
is committed twice from the same executor. (Multiple commits can happen due to 
Spark Stage/task retry).

A particular scenario where this can happen is provided below: 
 # Version V1 is loaded on executor A, RocksDB State Store has 195.sst, 
196.sst, 197.sst and 198.sst files. 
2. State changes are made, which result in creation of a new table file 
200.sst. 
3. State store is committed as version V2. The SST file 200.sst (as 
000200-8c80161a-bc23-4e3b-b175-cffe38e427c7.sst) is uploaded to DFS, and 
previous 4 files are reused. A new metadata file is created to track the exact 
SST files with unique IDs, and uploaded with RocksDB Manifest as part of V1.zip.
4. Rocks DB compaction is triggered at the same time. The compaction creates a 
new L1 file (201.sst), and deletes existing 5 SST files.
5. Spark Stage is retried. 
6. Version V1 is reloaded on the same executor. The local files are inspected, 
and 201.sst is deleted. The 4 SST files in version V1 are downloaded again to 
local file system. 
7. Any local files which are deleted (as part of version load) are also removed 
from local → DFS file upload tracking. **However, the files already deleted as 
a result of compaction are not removed from tracking. This is the bug which 
resulted in the failure.**
8. State store is committed as version V1. However, the local mapping of SST 
files to DFS file path still has 200.sst in its tracking, hence the SST file is 
not re-uploaded. A new metadata file is created to track the exact SST files 
with unique IDs, and uploaded with the new RocksDB Manifest as part of V2.zip. 
(The V2.zip file is overwritten here atomically)
9. A new executor tried to load version V2. However, the SST files in (1) are 
now incompatible with Manifest file in (6) resulting in the version Id mismatch 
failure.

 

We need to ensure that any files deleted from local filesystem post compaction 
are not tracked in uploadedDFSFiles mapping if the same version is loaded again.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-46928) Support ListState in Arbitrary State API v2

2024-01-31 Thread Bhuwan Sahni (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-46928?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17812820#comment-17812820
 ] 

Bhuwan Sahni commented on SPARK-46928:
--

PR submitted - https://github.com/apache/spark/pull/44961/files

> Support ListState in Arbitrary State API v2
> ---
>
> Key: SPARK-46928
> URL: https://issues.apache.org/jira/browse/SPARK-46928
> Project: Spark
>  Issue Type: New Feature
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Bhuwan Sahni
>Priority: Major
>
> As part of Arbitrary State API v2 
> ([https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig),]
>  we need to support ListState. This task encounters adding support for 
> ListState in Scala. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46928) Support ListState in Arbitrary State API v2

2024-01-30 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-46928:


 Summary: Support ListState in Arbitrary State API v2
 Key: SPARK-46928
 URL: https://issues.apache.org/jira/browse/SPARK-46928
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Bhuwan Sahni


As part of Arbitrary State API v2 
([https://docs.google.com/document/d/1QtC5qd4WQEia9kl1Qv74WE0TiXYy3x6zeTykygwPWig),]
 we need to support ListState. This task encounters adding support for 
ListState in Scala. 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-46796) RocksDB versionID Mismatch in SST files

2024-01-22 Thread Bhuwan Sahni (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-46796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809657#comment-17809657
 ] 

Bhuwan Sahni commented on SPARK-46796:
--

PR created - [https://github.com/apache/spark/pull/44837]

> RocksDB versionID Mismatch in SST files
> ---
>
> Key: SPARK-46796
> URL: https://issues.apache.org/jira/browse/SPARK-46796
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.2, 3.4.1, 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
>
> We need to ensure that the correct SST files are used on executor during 
> RocksDB load as per mapping in metadata.zip. With current implementation, its 
> possible that the executor uses a SST file (with a different UUID) from a 
> older version which is not the exact file mapped in the metadata.zip. This 
> can cause version Id mismatch errors while loading RocksDB leading to 
> streaming query failures.
> Few scenarios in which such a situation can occur are:
> **Scenario 1 - Distributed file system does not support overwrite 
> functionality**
>  # A task T1 on executor A commits Rocks Db snapshot for version X.
>  # Another task T2 on executor A loads version X-1, and tries to commit X. 
> During commit, SST files are copied but metadata file is not overwritten.
>  # Task T3 is scheduled on A, this task reuses previously loaded X (loaded in 
> (2) above) and commits X+1.
>  # Task T4 is scheduled on A again for state store version X. The executor 
> deletes SST files corresponding to commit X+1, downloads the metadata for 
> version X (which was committed in task T1), and loads RocksDB. This would 
> fail because the metadata in (1) is not compatible with SST files in (2).
>  
> **Scenario 2 - Multiple older State versions have different DFS files for a 
> particular SST file.**
> In the current logic, we look at all the versions older than X to find if a 
> local SST file can be reused. The reuse logic only ensures that the local SST 
> file was present in any of the previous version. However, its possible that 2 
> different older versions had a different SST file (`0001-uuid1.sst` and 
> `0001-uuid2.sst`) uploaded on DFS. These SST files will have the same local 
> name (with UUID truncated) and size, but are not compatible due to different 
> RocksDB Version Ids. We need to ensure that the correct SST file (as per 
> UUID) is picked as mentioned in the metadata.zip.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-46796) RocksDB versionID Mismatch in SST files

2024-01-22 Thread Bhuwan Sahni (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-46796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17809634#comment-17809634
 ] 

Bhuwan Sahni commented on SPARK-46796:
--

Working on a PR for the fix.

> RocksDB versionID Mismatch in SST files
> ---
>
> Key: SPARK-46796
> URL: https://issues.apache.org/jira/browse/SPARK-46796
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.2, 3.4.1, 3.5.0, 4.0.0, 3.5.1, 3.5.2
>Reporter: Bhuwan Sahni
>Priority: Major
>
> We need to ensure that the correct SST files are used on executor during 
> RocksDB load as per mapping in metadata.zip. With current implementation, its 
> possible that the executor uses a SST file (with a different UUID) from a 
> older version which is not the exact file mapped in the metadata.zip. This 
> can cause version Id mismatch errors while loading RocksDB leading to 
> streaming query failures.
> Few scenarios in which such a situation can occur are:
> **Scenario 1 - Distributed file system does not support overwrite 
> functionality**
>  # A task T1 on executor A commits Rocks Db snapshot for version X.
>  # Another task T2 on executor A loads version X-1, and tries to commit X. 
> During commit, SST files are copied but metadata file is not overwritten.
>  # Task T3 is scheduled on A, this task reuses previously loaded X (loaded in 
> (2) above) and commits X+1.
>  # Task T4 is scheduled on A again for state store version X. The executor 
> deletes SST files corresponding to commit X+1, downloads the metadata for 
> version X (which was committed in task T1), and loads RocksDB. This would 
> fail because the metadata in (1) is not compatible with SST files in (2).
>  
> **Scenario 2 - Multiple older State versions have different DFS files for a 
> particular SST file.**
> In the current logic, we look at all the versions older than X to find if a 
> local SST file can be reused. The reuse logic only ensures that the local SST 
> file was present in any of the previous version. However, its possible that 2 
> different older versions had a different SST file (`0001-uuid1.sst` and 
> `0001-uuid2.sst`) uploaded on DFS. These SST files will have the same local 
> name (with UUID truncated) and size, but are not compatible due to different 
> RocksDB Version Ids. We need to ensure that the correct SST file (as per 
> UUID) is picked as mentioned in the metadata.zip.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-46796) RocksDB versionID Mismatch in SST files

2024-01-22 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-46796:


 Summary: RocksDB versionID Mismatch in SST files
 Key: SPARK-46796
 URL: https://issues.apache.org/jira/browse/SPARK-46796
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.5.0, 3.4.1, 3.4.2, 4.0.0, 3.5.1, 3.5.2
Reporter: Bhuwan Sahni


We need to ensure that the correct SST files are used on executor during 
RocksDB load as per mapping in metadata.zip. With current implementation, its 
possible that the executor uses a SST file (with a different UUID) from a older 
version which is not the exact file mapped in the metadata.zip. This can cause 
version Id mismatch errors while loading RocksDB leading to streaming query 
failures.

Few scenarios in which such a situation can occur are:

**Scenario 1 - Distributed file system does not support overwrite 
functionality**
 # A task T1 on executor A commits Rocks Db snapshot for version X.
 # Another task T2 on executor A loads version X-1, and tries to commit X. 
During commit, SST files are copied but metadata file is not overwritten.
 # Task T3 is scheduled on A, this task reuses previously loaded X (loaded in 
(2) above) and commits X+1.
 # Task T4 is scheduled on A again for state store version X. The executor 
deletes SST files corresponding to commit X+1, downloads the metadata for 
version X (which was committed in task T1), and loads RocksDB. This would fail 
because the metadata in (1) is not compatible with SST files in (2).

 

**Scenario 2 - Multiple older State versions have different DFS files for a 
particular SST file.**


In the current logic, we look at all the versions older than X to find if a 
local SST file can be reused. The reuse logic only ensures that the local SST 
file was present in any of the previous version. However, its possible that 2 
different older versions had a different SST file (`0001-uuid1.sst` and 
`0001-uuid2.sst`) uploaded on DFS. These SST files will have the same local 
name (with UUID truncated) and size, but are not compatible due to different 
RocksDB Version Ids. We need to ensure that the correct SST file (as per UUID) 
is picked as mentioned in the metadata.zip.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45655) current_date() not supported in Streaming Query Observed metrics

2023-10-24 Thread Bhuwan Sahni (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779264#comment-17779264
 ] 

Bhuwan Sahni commented on SPARK-45655:
--

PR link https://github.com/apache/spark/pull/43517

> current_date() not supported in Streaming Query Observed metrics
> 
>
> Key: SPARK-45655
> URL: https://issues.apache.org/jira/browse/SPARK-45655
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.1, 3.5.0
>Reporter: Bhuwan Sahni
>Priority: Major
>  Labels: pull-request-available
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Streaming queries do not support current_date() inside CollectMetrics. The 
> primary reason is that current_date() (resolves to CurrentBatchTimestamp) is 
> marked as non-deterministic. However, {{current_date}} and 
> {{current_timestamp}} are both deterministic today, and 
> {{current_batch_timestamp}} should be the same.
>  
> As an example, the query below fails due to observe call on the DataFrame.
>  
> {quote}val inputData = MemoryStream[Timestamp]
> inputData.toDF()
>       .filter("value < current_date()")
>       .observe("metrics", count(expr("value >= 
> current_date()")).alias("dropped"))
>       .writeStream
>       .queryName("ts_metrics_test")
>       .format("memory")
>       .outputMode("append")
>       .start()
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-45655) current_date() not supported in Streaming Query Observed metrics

2023-10-24 Thread Bhuwan Sahni (Jira)
Bhuwan Sahni created SPARK-45655:


 Summary: current_date() not supported in Streaming Query Observed 
metrics
 Key: SPARK-45655
 URL: https://issues.apache.org/jira/browse/SPARK-45655
 Project: Spark
  Issue Type: Bug
  Components: Structured Streaming
Affects Versions: 3.5.0, 3.4.1
Reporter: Bhuwan Sahni


Streaming queries do not support current_date() inside CollectMetrics. The 
primary reason is that current_date() (resolves to CurrentBatchTimestamp) is 
marked as non-deterministic. However, {{current_date}} and 
{{current_timestamp}} are both deterministic today, and 
{{current_batch_timestamp}} should be the same.

 

As an example, the query below fails due to observe call on the DataFrame.

 
{quote}val inputData = MemoryStream[Timestamp]

inputData.toDF()
      .filter("value < current_date()")
      .observe("metrics", count(expr("value >= 
current_date()")).alias("dropped"))
      .writeStream
      .queryName("ts_metrics_test")
      .format("memory")
      .outputMode("append")
      .start()
{quote}
 



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-45655) current_date() not supported in Streaming Query Observed metrics

2023-10-24 Thread Bhuwan Sahni (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-45655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17779239#comment-17779239
 ] 

Bhuwan Sahni commented on SPARK-45655:
--

I am working on a fix for this issue, and will submit a PR soon.

> current_date() not supported in Streaming Query Observed metrics
> 
>
> Key: SPARK-45655
> URL: https://issues.apache.org/jira/browse/SPARK-45655
> Project: Spark
>  Issue Type: Bug
>  Components: Structured Streaming
>Affects Versions: 3.4.1, 3.5.0
>Reporter: Bhuwan Sahni
>Priority: Major
>   Original Estimate: 48h
>  Remaining Estimate: 48h
>
> Streaming queries do not support current_date() inside CollectMetrics. The 
> primary reason is that current_date() (resolves to CurrentBatchTimestamp) is 
> marked as non-deterministic. However, {{current_date}} and 
> {{current_timestamp}} are both deterministic today, and 
> {{current_batch_timestamp}} should be the same.
>  
> As an example, the query below fails due to observe call on the DataFrame.
>  
> {quote}val inputData = MemoryStream[Timestamp]
> inputData.toDF()
>       .filter("value < current_date()")
>       .observe("metrics", count(expr("value >= 
> current_date()")).alias("dropped"))
>       .writeStream
>       .queryName("ts_metrics_test")
>       .format("memory")
>       .outputMode("append")
>       .start()
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org