[jira] [Created] (SPARK-48589) Add option snapshotStartBatchId and snapshotPartitionId to state data source
Yuchen Liu created SPARK-48589: -- Summary: Add option snapshotStartBatchId and snapshotPartitionId to state data source Key: SPARK-48589 URL: https://issues.apache.org/jira/browse/SPARK-48589 Project: Spark Issue Type: New Feature Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu Define two new options, _snapshotStartBatchId_ and _snapshotPartitionId_, for the existing state reader. Both of them should be provided at the same time. # When there is no snapshot file at that batch (note there is an off-by-one issue between version and batch Id), throw an exception. # Otherwise, the reader should continue to rebuild the state by reading delta files only, and ignore all snapshot files afterwards. # Note that if a batchId option is already specified. That batchId is the ending batchId, we should then end at that batchId. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48588) Fine-grained State Data Source
Yuchen Liu created SPARK-48588: -- Summary: Fine-grained State Data Source Key: SPARK-48588 URL: https://issues.apache.org/jira/browse/SPARK-48588 Project: Spark Issue Type: Epic Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu The current state reader API replays the state store rows from the latest snapshot and newer delta files if any. The issue with this mechanism is that sometimes, the snapshot files could be wrongly constructed, or user want to know the change of state across batches. We need to improve the State Reader so that it can handle a variety of fine-grained requirements. For example, reconstruct a state based on arbitrary snapshot; support CDC mode for state evolution. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48542) Give snapshotStartBatchId and snapshotPartitionId to the state data source
Yuchen Liu created SPARK-48542: -- Summary: Give snapshotStartBatchId and snapshotPartitionId to the state data source Key: SPARK-48542 URL: https://issues.apache.org/jira/browse/SPARK-48542 Project: Spark Issue Type: New Feature Components: SQL, Structured Streaming Affects Versions: 4.0.0 Environment: This should work for both HDFS state store and RocksDB state store. Reporter: Yuchen Liu Right now, to read a version of the state data, the state source will try to find the first snapshot file before the given version and construct it using the delta files. In some debugging scenarios, users need more granular control on how to reconstruct the given state, for example they want to start from a specific snapshot instead of the closest one. One use case is to find whether a snapshot has been corrupted after committing. This task introduces two options {{snapshotStartBatchId}} and {{snapshotPartitionId}} to the state data source. By specifying them, users can control the starting batch id of the snapshot and partition id of the state. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Updated] (SPARK-48447) Check state store provider class before invoking the constructor
[ https://issues.apache.org/jira/browse/SPARK-48447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yuchen Liu updated SPARK-48447: --- Priority: Major (was: Minor) > Check state store provider class before invoking the constructor > > > Key: SPARK-48447 > URL: https://issues.apache.org/jira/browse/SPARK-48447 > Project: Spark > Issue Type: Improvement > Components: Structured Streaming >Affects Versions: 4.0.0 >Reporter: Yuchen Liu >Priority: Major > Original Estimate: 24h > Remaining Estimate: 24h > > We should restrict that only classes > [extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73] > {{StateStoreProvider}} can be constructed to prevent customer from > instantiating arbitrary class of objects. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48447) Check state store provider class before invoking the constructor
Yuchen Liu created SPARK-48447: -- Summary: Check state store provider class before invoking the constructor Key: SPARK-48447 URL: https://issues.apache.org/jira/browse/SPARK-48447 Project: Spark Issue Type: Improvement Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu We should restrict that only classes [extending|https://github.com/databricks/runtime/blob/1440e77ab54c40981066c22ec759bdafc0683e76/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/HDFSBackedStateStoreProvider.scala#L73] {{StateStoreProvider}} can be constructed to prevent customer from instantiating arbitrary class of objects. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Created] (SPARK-48446) Update SS Doc of dropDuplicatesWithinWatermark to use the right syntax
Yuchen Liu created SPARK-48446: -- Summary: Update SS Doc of dropDuplicatesWithinWatermark to use the right syntax Key: SPARK-48446 URL: https://issues.apache.org/jira/browse/SPARK-48446 Project: Spark Issue Type: Documentation Components: Structured Streaming Affects Versions: 4.0.0 Reporter: Yuchen Liu For dropDuplicates, the example on [https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#:~:text=)%20%5C%0A%20%20.-,dropDuplicates,-(%22guid%22] is out of date compared with [https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.dropDuplicates.html]. The argument should be a list. The discrepancy is also true for dropDuplicatesWithinWatermark. -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org
[jira] [Commented] (SPARK-48411) Add E2E test for DropDuplicateWithinWatermark
[ https://issues.apache.org/jira/browse/SPARK-48411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17849356#comment-17849356 ] Yuchen Liu commented on SPARK-48411: I will work on this. > Add E2E test for DropDuplicateWithinWatermark > - > > Key: SPARK-48411 > URL: https://issues.apache.org/jira/browse/SPARK-48411 > Project: Spark > Issue Type: New Feature > Components: Connect, SS >Affects Versions: 4.0.0 >Reporter: Wei Liu >Priority: Major > > Currently we do not have a e2e test for DropDuplicateWithinWatermark, we > should add one. We can simply use one of the test written in Scala here (with > the testStream API) and replicate it to python: > [https://github.com/apache/spark/commit/0e9e34c1bd9bd16ad5efca77ce2763eb950f3103] > > The change should happen in > [https://github.com/apache/spark/blob/eee179135ed21dbdd8b342d053c9eda849e2de77/python/pyspark/sql/tests/streaming/test_streaming.py#L29] > > so we can test it in both connect and non-connect. > > Test with: > ``` > python/run-tests --testnames pyspark.sql.tests.streaming.test_streaming > python/run-tests --testnames > pyspark.sql.tests.connect.streaming.test_parity_streaming > ``` -- This message was sent by Atlassian Jira (v8.20.10#820010) - To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org