[jira] [Created] (SPARK-48589) Add option snapshotStartBatchId and snapshotPartitionId to state data source

2024-06-11 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48589:
--

 Summary: Add option snapshotStartBatchId and snapshotPartitionId 
to state data source
 Key: SPARK-48589
 URL: https://issues.apache.org/jira/browse/SPARK-48589
 Project: Spark
  Issue Type: New Feature
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


Define two new options, _snapshotStartBatchId_ and _snapshotPartitionId_, for 
the existing state reader. Both of them should be provided at the same time.
 # When there is no snapshot file at that batch (note there is an off-by-one 
issue between version and batch Id), throw an exception.
 # Otherwise, the reader should continue to rebuild the state by reading delta 
files only, and ignore all snapshot files afterwards.
 # Note that if a batchId option is already specified. That batchId is the 
ending batchId, we should then end at that batchId.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48588) Fine-grained State Data Source

2024-06-11 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48588:
--

 Summary: Fine-grained State Data Source
 Key: SPARK-48588
 URL: https://issues.apache.org/jira/browse/SPARK-48588
 Project: Spark
  Issue Type: Epic
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


The current state reader API replays the state store rows from the latest 
snapshot and newer delta files if any. The issue with this mechanism is that 
sometimes, the snapshot files could be wrongly constructed, or user want to 
know the change of state across batches. We need to improve the State Reader so 
that it can handle a variety of fine-grained requirements. For example, 
reconstruct a state based on arbitrary snapshot; support CDC mode for state 
evolution.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48542) Give snapshotStartBatchId and snapshotPartitionId to the state data source

2024-06-05 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48542:
--

 Summary: Give snapshotStartBatchId and snapshotPartitionId to the 
state data source
 Key: SPARK-48542
 URL: https://issues.apache.org/jira/browse/SPARK-48542
 Project: Spark
  Issue Type: New Feature
  Components: SQL, Structured Streaming
Affects Versions: 4.0.0
 Environment: This should work for both HDFS state store and RocksDB 
state store.
Reporter: Yuchen Liu


Right now, to read a version of the state data, the state source will try to 
find the first snapshot file before the given version and construct it using 
the delta files. In some debugging scenarios, users need more granular control 
on how to reconstruct the given state, for example they want to start from a 
specific snapshot instead of the closest one. One use case is to find whether a 
snapshot has been corrupted after committing.

This task introduces two options {{snapshotStartBatchId}} and 
{{snapshotPartitionId}} to the state data source. By specifying them, users can 
control the starting batch id of the snapshot and partition id of the state.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Updated] (SPARK-48447) Check state store provider class before invoking the constructor

2024-05-28 Thread Yuchen Liu (Jira)


 [ 
https://issues.apache.org/jira/browse/SPARK-48447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuchen Liu updated SPARK-48447:
---
Priority: Major  (was: Minor)

> Check state store provider class before invoking the constructor
> 
>
> Key: SPARK-48447
> URL: https://issues.apache.org/jira/browse/SPARK-48447
> Project: Spark
>  Issue Type: Improvement
>  Components: Structured Streaming
>Affects Versions: 4.0.0
>Reporter: Yuchen Liu
>Priority: Major
>   Original Estimate: 24h
>  Remaining Estimate: 24h
>
> We should restrict that only classes 
> [extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73]
>  {{StateStoreProvider}} can be constructed to prevent customer from 
> instantiating arbitrary class of objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48447) Check state store provider class before invoking the constructor

2024-05-28 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48447:
--

 Summary: Check state store provider class before invoking the 
constructor
 Key: SPARK-48447
 URL: https://issues.apache.org/jira/browse/SPARK-48447
 Project: Spark
  Issue Type: Improvement
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


We should restrict that only classes 
[extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73]
 {{StateStoreProvider}} can be constructed to prevent customer from 
instantiating arbitrary class of objects.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Created] (SPARK-48446) Update SS Doc of dropDuplicatesWithinWatermark to use the right syntax

2024-05-28 Thread Yuchen Liu (Jira)
Yuchen Liu created SPARK-48446:
--

 Summary: Update SS Doc of dropDuplicatesWithinWatermark to use the 
right syntax
 Key: SPARK-48446
 URL: https://issues.apache.org/jira/browse/SPARK-48446
 Project: Spark
  Issue Type: Documentation
  Components: Structured Streaming
Affects Versions: 4.0.0
Reporter: Yuchen Liu


For dropDuplicates, the example on 
[https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#:~:text=)%20%5C%0A%20%20.-,dropDuplicates,-(%22guid%22]
 is out of date compared with 
[https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropDuplicates.html].
 The argument should be a list.

The discrepancy is also true for dropDuplicatesWithinWatermark.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org



[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark

2024-05-24 Thread Yuchen Liu (Jira)


[ 
https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849356#comment-17849356
 ] 

Yuchen Liu commented on SPARK-48411:


I will work on this.

> Add E2E test for DropDuplicateWithinWatermark
> -
>
> Key: SPARK-48411
> URL: https://issues.apache.org/jira/browse/SPARK-48411
> Project: Spark
>  Issue Type: New Feature
>  Components: Connect, SS
>Affects Versions: 4.0.0
>Reporter: Wei Liu
>Priority: Major
>
> Currently we do not have a e2e test for DropDuplicateWithinWatermark, we 
> should add one. We can simply use one of the test written in Scala here (with 
> the testStream API) and replicate it to python:
> [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103]
>  
> The change should happen in 
> [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29]
>  
> so we can test it in both connect and non-connect.
>  
> Test with:
> ```
> python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming
> python/run-tests --testnames 
> pyspark.sql.tests.connect.streaming.test_parity_streaming
> ```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

-
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org