[ 
https://issues.apache.org/jira/browse/SPARK-49374?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Siying Dong updated SPARK-49374:
--------------------------------
    Description: 
h2. Design Doc: 
[https://docs.google.com/document/d/1uWRMbN927cRXhSm5oeV3pbwb6o73am4r1ckEJDhAHa0/edit?usp=sharing]


Motivation

We expect the new checkpoint structure would be beneficial by establishing 
characteristics of linear dependency between batch versions. Right now, tasks 
can be executed multiple times for the same batchID (for speculative execution 
or rerunning in ForEachBatch), and there can be multiple parallel lineages of 
state stores going on. For example, in one of the issue with ForEachBatch 
showed this lineage, which triggered a RocksDB file uploading bug:

!image-2024-08-23-14-28-56-418.png!

Although we fixed all the bugs, this complexity always makes the system prone 
to bugs. This non-linear lineage also presents a correctness risk, when some of 
the Version is changelogs. 

!image-2024-08-23-14-29-19-443.png!

In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18 
is a changelog checkpoint. When we need to recover from a checkpoint, we need 
to apply Version 17 and Version 18 together. However, Version 18 isn’t 
generated on top of Version 17. This can happen either because Version 18 is 
generated by a different worker from Version 17, or the same worker abandoned 
Version 17, replay this batch and generated Version 17’. In most cases, it is 
accurate, but we have identified some edge cases where users might be surprised 
by the results, and there may already be correctness issues. These correctness 
issues will become more prominent with transformWithState due to the occurrence 
of partial updates. (See Appendix for examples). This issue is not specific to 
RocksDB State Store but is a general state store problem, although the scope of 
the project is only limited to RocksDB State Store. Note that the fixing state 
store lineage only makes sure the state store is consistent, and doesn’t make 
sure the state store is consistent with outputs to the sink.

Furthermore, the complex checkpoint version lineage makes it hard to reduce 
overhead for old version cleanup. Even a long running state store has to read 
metadata for all previous versions and list all files to do any cleanup safely, 
which is expensive. It is necessary because any version can be written by a 
different executor and references to RocksDB files that the current executor 
isn’t aware of. In extreme cases, we can even corrupt the state store. The 
chance that it happens is very low, but it’s not a good idea to leave unknown 
correctness risk unfixed.
h2. Proposal sketch

The proposed checkpoint structure will ensure a linear dependency:

!image-2024-08-23-14-29-59-165.png!

This stronger guarantee will be a good foundation for the problems above.

 

The proposal’s basic idea is to guarantee linear lineage by not allowing 
checkpoint overwriting. All checkpoints are made with a new file name with a 
uniqueID. When starting any batch, a task precisely identifies which checkpoint 
to load with the uniqueID.

When a new state store checkpoint is generated, the checkpoint path name 
includes a globally unique ID, so that it can never be updated. Here is an 
example:
20_d8e2ca47.zip

21_ef6618c2.delta

21_f4d05ac9.delta

22_4489578d.delta

The name is stored in the commit log too. When the next batch is being 
executed, those unique IDs are passed to the executors, where they make sure 
they start to execute from this checkpoint. If the local state store isn’t in 
this state, it will download the checkpoint from the cloud.
h1. Part II: Detailed design
h2. Architecture

Previously, a state store checkpoint was stored in a path that was only 
determined by checkpoint root path, operatorID, partitionID and batchID. When a 
stateful operator is executed, it is always able to construct the path with 
that information. It will download the checkpoint path from the path of the 
previous batchID and checkpoint to the path for this batchID. As mentioned 
earlier, this flexibility comes with a cost: there is no way to distinguish 
among checkpoints generated by different tasks rerunning for the same batchID.

 

In this new design, every checkpoint will go to a globally unique file. The 
globally unique ID needs to be communicated between driver and the executors, 
and stored in commit logs. The basic workflow is shown as following:

!image-2024-08-23-14-30-22-474.png!
h3. File Structure Under Checkpoint Path

Currently, a checkpoint is stored in path 
_<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog, 
zip]._ The path structure will look like following:

 __ 0 (operator ID)

    +----+

     | 0 (partitionID)

     +-----+

     |     ……

     | 1 (partitionID)

     +-----+

     |          |- default (storeName)

     |         +-----+

     |                     |  20.zip

     |                     |  21.delta

     |                     |  22.delta

     |                     +  23.delta

     | 2 (partitionID)

    +--- ……

 

The general structure will be intact, and we only change how files in the root 
path. For example, in the example above, we will only change file names in the 
file names in blue color. Instead of naming the files _<batchID>.zip_ or 
{_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or 
{_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID, 
there could be multiple files. Here is an example:

20_d8e2ca47.zip

21_ef6618c2.delta

21_f4d05ac9.delta

22_4489578d.delta

23_689aa6bd.delta

The randomID will be generated by the executor itself. In the first version, it 
is just an UUID, which can be shortened later if needed. In the Appendix, we 
will discuss a design decision on how to generate these random IDs. These 
random numbers will be persistent in commit logs. When a new batch starts, 
these randomIDs will be passed to executors as a part of the operator 
executors. By the end of the batch, the IDs for the new checkpoints will be 
passed to the driver through an accumulator, where drivers can persist to the 
commit logs.

 

The lineage information is always managed in commit logs, but the lineage is 
often needed in state store level when they download checkpoint or do cleanup. 
To make a state store self-contained, some lineage information is also stored 
in the .zip and .delta files themselves, so that in the level of a single state 
store, we can always finish all the operators without relying on outside 
information. This decision will be discussed in the appendix. Each delta file 
will contain a unique checkpoint ID since all (presumed) snapshot checkpoints, 
and all changelog checkpoints following it. Will discuss it in more detail.

 

There are still extra complexities to this problem and we will discuss those 
issues in the following sections.
1. Recovery Case. The driver will tell executors which version to use, but how 
do executors find old snapshots and deltas to use before this version? 
 # Cleanup old versions.

h3. Commit Log Format Change

We will add a new field to commit message {_}CommitMetadata{_}, and add a 
field, which is a {_}Map[Map[Seq[String]]]{_}, which represents 
operatorID→storeName→partitionID→checkpointUniqueID. 
h3. Checkpoint File Format Change

In both changelog file and zip file, extra information on uniqueIDs since the 
presumed last snapshot will be written. It is a presumed snapshot because 
sometimes snapshotting can fail or be delayed so what in place of the presumed 
last snapshot is only a changelog. In those cases, we can still reconstruct 
further lineage by reading from that changelog file from the presumed snapshot. 
By keeping reading lineage from previous snapshots, the whole lineage can 
always be re-constructed.

 

When we need to download a checkpoint, we need to find the latest snapshot 
checkpoint and all subsequence changelog files. Usually, this can be done by 
only reading the delta file corresponding to the target unique checkpoint ID. 
In the cases of snapshot uploading failure, we will need to keep reading more 
than one delta file.

 

In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_}, which 
contains a list of unique_IDs. These IDs represent unique IDs for version-1, 
version-2, etc.

 

In the zip files, this list will be added to class 
{_}RocksDBCheckpointMetadata{_}.

 

The information should always be available, as
 # When initially loading a state store, we always have lineage information 
loaded since the last snapshot.
 # When we add a new changelog checkpoint, we add one checkpoint ID to the list 
and it is still the full list.
 # We can prune the list saved in memory based on presumed snapshot versions.

 

An alternative to change checkpoint file format is for the state store to read 
commit log files to get the lineage, which has shortcomes: 1. it has to read 
multiple commit logs which can be slow; 2. State store checkpoint directory 
won’t be self contained and the state store has to understand the commit log. 
h3. Passing UniqueID from Driver to Executors

We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID 
to use. The field will be an array of strings. Each string is corresponding to 
a uniqueID for one partition. This struct is already serialized to the 
executors.
h3. Passing UniqueID to Driver

The uniqueID will be passed back in a similar approach as 
{_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.  
_StatefulOperator_ will have an optional accumulator, with which we receive all 
the uniqueIDs from all tasks. In some cases, we may receive more than one 
uniqueID for one partition. The driver can simply pick any of them as the one 
to commit. Note that it never happens when the executors pass Marlin’s end 
offset back, as only one attempt will be made. In microbatch mode, some  of the 
IDs might be from failed tasks, and we might pick it. It should still be 
correct as even if the task fails, the checkpoint should still be valid to use. 
One potential problem is that the checkpoint we use may not match the output 
used in the sink. It is not a problem for now, as it happens today anyway 
without the problem. We can revisit the decision if we see such a problem in 
the future.

 

We will pass back not just a uniqueID for the new checkpoint based on V, but 
also the unique ID used for V-1. This will help the driver to validate that 
lineage is as expected. 
h3. State Store Checkpointing

When state store checkpointing starts, a unique ID (UUID) is generated, and 
stored in the state store provider. This ID is used to construct delta and zip 
file names for the checkpoint. The checkpoint procedure is mostly the same as 
today. The only thing is that we need to preserve the unique ID and the lineage 
information and make sure we use the correct one. Since the snapshot checkpoint 
is done asynchronously, we need to make an appropriate copy to them to make 
sure they are consistent with the snapshot.

 

Checkpointing would look like following:
20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)

23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

Assuming a snapshot checkpointing is scheduled too, so later snapshotting will 
succeed and the snapshot file will show up:
20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)

23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be 
uploaded. In this case, we will stay with a delta file only and continue from 
there.

The following delta files may only contain lineage up to Version, but we can 
further trace back to version 20 by reading 
{_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

24_32e3cc2a.delta (lineage in file: 689aa6bd)

It is possible that we have two executors doing a full snapshot. The one picked 
up for lineage failed but the other one succeeded. In an example, (Version=23, 
ID=8205c96f) got a full snapshot checkpoint. It will generate a result like 
this:

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

24_32e3cc2a.delta (lineage in file: 689aa6bd)

But since 23_8205c96f is never referenced by lineage in either commit log or 
checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version 
23, 23_689aa6bd.delta still is used.

 

Note that there is a chance where the same executor executed the same partition 
of the same batchID twice and both were successful. This is common in the case 
of ForEachBatch. We need to make sure we name the snapshot correctly. Here is 
an example:
 # Executed version 23, picked uniqueID 689aa6bd and successfully uploaded file 
23_689aa6bd.delta. 
 # Make RocksDB checkpoint Ckp1.
 # Return to the driver, and 23_689aa6bd is chosen to be committed.
 # Executed version 23, picked uniqueID 8205c96f and successfully uploaded file 
23_8205c96f.delta. 
 # Make RocksDB checkpoint Ckp2.
 # Maintenance thread wakes up for snapshot checkpoints.

The maintenance thread needs to upload two snapshots. CKp1 →  23_689aa6bd.zip, 
as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded as it doesn’t know 
which one is the one to be committed to the commit log. If it always uploads 
one, there is a chance that it always picks up the wrong one and we don’t have 
snapshot checkpointing for a long time, or ever.

One potential optimization is for the snapshot to wait a little bit for the 
next batch to start, so that it knows it is 23_689aa6bd that is picked up, so 
that it can only upload 23_689aa6bd.zip and skip  23_8205c96f.delta. If the 
next batch doesn’t start timely or ever, it will upload both.
h3. State Store Load

As normal cases, most of the time, executors already have the open state store 
for the one to load. The executor just needs to validate the open state store 
is at the checkpoint ID that matches the one sent from the driver. If the ID 
matches, we already have the state store loaded. If not, we need to reload from 
the cloud storage.

 

When we load a checkpoint from the state store, we first construct checkpoint 
names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}. 
If the former file exists, we just load it. Otherwise, we load the former name, 
read the first record, which is supposed to be the lineage metadata. Using the 
metadata, we can download the last snapshot version and apply subsequent delta 
files.

 

{*}Example 1{*}. The uniqueID has a snapshot.

In this example, we are trying to load (version=23, uniqueID=689aa6bd). 

20_d8e2ca47.delta

20_d8e2ca47.zip

21_ef6618c2.delta

21_f4d05ac9.delta

22_4489578d.delta

23_689aa6bd.delta

23_689aa6bd.zip

We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.

 

{*}Example 2{*}. The uniqueID only contains a delta log file.

If we still load (version=23, uiqueID=689aa6bd), but there is only a delta file:
20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

case, we will read lineage from 23_689aa6bd.delta and apply these files: 
20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The 
outcome is the same even if there is snapshot file for the same version but 
with different checkpoint:

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.

 

{*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.

In the case where snapshot uploading fails, we will need to continue tracing 
the lineage. In following example,

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

24_32e3cc2a.delta (lineage in file: 689aa6bd)

When we are loading (version=24, ID=32e3cc2a), we only have lineage up to 
23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will 
open 23_689aa6bd.delta, and trace back files. Eventually, we will need to apply 
following files: 20_d8e2ca47.zip,21_f4d05ac9.delta, 
22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.

Note that, even if snapshot file shows up for a uniqueID, we will ignored them, 
and use the same 
h3. Compatibility

The new format will not be consumed by previous releases. So the feature will 
be turned off by default for microbatch mode. Later we will switch the default 
to be on.

 

Within the same release, we will allow users to switch between the V1 and V2. 
The driver will read the commit log and see whether uniqueID is available or 
not. If it is available, it is sent to the executors so that they can load 
checkpoints accordingly. Otherwise, it will assume it is V1. When switching 
mode, we will need to make sure that the first checkpoint is a full snapshot 
checkpoint synchronously. In this case, loading a checkpoint only needs to deal 
with files generated by either V1 or V2, not a mixture of the two.

 
h1. Appendix: Data Correctness Issue When Replaying Unmatches changelog files

Here is an example where users are not able to achieve what they want. Consider 
such a query:

……
.withColumn("random_key", (rand() * 1024).cast("int"))

.groupBy("random_key")

   .agg(collect_list($"payload").as("payload"))

……

All the query does is randomly distribute an entry to one of the 1024 buckets, 
and inside a bucket, entries are collected together. Consider such an execution:

Batch 1:
  Input: foo
  Task1 distributes it to key 6, so state store state: (6→ foo), we checkpoint 
it to 1.snapshot
  Task2 distributes it to key 8, so state store state: (8 →foo), but checkpoint 
finishes and the checkpoint is lost

 

Batch 2:
  Input: bar
  The only task starts with what is left over by Task2 (8→foo), and distributed 
the entry to key 6, so it has a delta state store update (6→bar), and 
successfully checkpoint to 2.delta

Query Restart:
  The query is restarted, and replay 1.snapshot + 2.delta. The outcome is 
(6→bar). “Foo” is lost.

 

The issue is more prominent with transformWithSate. Here is an example:

 

In transformWithState, the user writes the UDF to sample exactly 3 elements 
from the key group. And this can happen:
Batch n:

At Batch n Start, they have elements (A, B, C)

In Batch n, element D comes

Task 1 randomly drops A, checkpointing as snapshot (B, C, D); Task 2 randomly 
drops B, checkpointing as snapshot (A, C, D), Task 2's checkpointing wins, 
leaving (A, C, D) as the official checkpoint;

Batch n+1:

In Batch n+1, element E comes.

Task 1 continue its processing against its local state store, dropping B, 
checkpointing as changelog (add E, remove B)

 

Query Restart:

Now if the query restarts and recovers from checkpoints, we would apply (A, C, 
D) + (add E, remove B). There is actually no B. Assuming the behavior is that 
we can do blind B removing, we will leave (A, C, D, E). The user has the UDF to 
sample 3 elements, and they got 4.

  was:
h2. Design Doc: 
[https://docs.google.com/document/d/1uWRMbN927cRXhSm5oeV3pbwb6o73am4r1ckEJDhAHa0/edit?usp=sharing]
 

Motivation

We expect the new checkpoint structure would be beneficial by establishing 
characteristics of linear dependency between batch versions. Right now, tasks 
can be executed multiple times for the same batchID (for speculative execution 
or rerunning in ForEachBatch), and there can be multiple parallel lineages of 
state stores going on. For example, in one of the issue with ForEachBatch 
showed this lineage, which triggered a RocksDB file uploading bug:

!image-2024-08-23-14-28-56-418.png!

Although we fixed all the bugs, this complexity always makes the system prone 
to bugs. This non-linear lineage also presents a correctness risk, when some of 
the Version is changelogs. 

!image-2024-08-23-14-29-19-443.png!

In the same example, suppose Version 17 is a Snapshot checkpoint and Version 18 
is a changelog checkpoint. When we need to recover from a checkpoint, we need 
to apply Version 17 and Version 18 together. However, Version 18 isn’t 
generated on top of Version 17. This can happen either because Version 18 is 
generated by a different worker from Version 17, or the same worker abandoned 
Version 17, replay this batch and generated Version 17’. In most cases, it is 
accurate, but we have identified some edge cases where users might be surprised 
by the results, and there may already be correctness issues. These correctness 
issues will become more prominent with transformWithState due to the occurrence 
of partial updates. (See Appendix for examples). This issue is not specific to 
RocksDB State Store but is a general state store problem, although the scope of 
the project is only limited to RocksDB State Store. Note that the fixing state 
store lineage only makes sure the state store is consistent, and doesn’t make 
sure the state store is consistent with outputs to the sink.

Furthermore, the complex checkpoint version lineage makes it hard to reduce 
overhead for old version cleanup. Even a long running state store has to read 
metadata for all previous versions and list all files to do any cleanup safely, 
which is expensive. It is necessary because any version can be written by a 
different executor and references to RocksDB files that the current executor 
isn’t aware of. In extreme cases, we can even corrupt the state store. The 
chance that it happens is very low, but it’s not a good idea to leave unknown 
correctness risk unfixed.
h2. Proposal sketch

The proposed checkpoint structure will ensure a linear dependency:

!image-2024-08-23-14-29-59-165.png!

This stronger guarantee will be a good foundation for the problems above.

 

The proposal’s basic idea is to guarantee linear lineage by not allowing 
checkpoint overwriting. All checkpoints are made with a new file name with a 
uniqueID. When starting any batch, a task precisely identifies which checkpoint 
to load with the uniqueID.

When a new state store checkpoint is generated, the checkpoint path name 
includes a globally unique ID, so that it can never be updated. Here is an 
example:
20_d8e2ca47.zip

21_ef6618c2.delta

21_f4d05ac9.delta

22_4489578d.delta

The name is stored in the commit log too. When the next batch is being 
executed, those unique IDs are passed to the executors, where they make sure 
they start to execute from this checkpoint. If the local state store isn’t in 
this state, it will download the checkpoint from the cloud.
h1. Part II: Detailed design
h2. Architecture

Previously, a state store checkpoint was stored in a path that was only 
determined by checkpoint root path, operatorID, partitionID and batchID. When a 
stateful operator is executed, it is always able to construct the path with 
that information. It will download the checkpoint path from the path of the 
previous batchID and checkpoint to the path for this batchID. As mentioned 
earlier, this flexibility comes with a cost: there is no way to distinguish 
among checkpoints generated by different tasks rerunning for the same batchID.

 

In this new design, every checkpoint will go to a globally unique file. The 
globally unique ID needs to be communicated between driver and the executors, 
and stored in commit logs. The basic workflow is shown as following:

!image-2024-08-23-14-30-22-474.png!
h3. File Structure Under Checkpoint Path

Currently, a checkpoint is stored in path 
_<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog, 
zip]._ The path structure will look like following:

 __ 0 (operator ID)

    +----+

     | 0 (partitionID)

     +-----+

     |     ……

     | 1 (partitionID)

     +-----+

     |          |- default (storeName)

     |         +-----+

     |                     |  20.zip

     |                     |  21.delta

     |                     |  22.delta

     |                     +  23.delta

     | 2 (partitionID)

    +--- ……

 

The general structure will be intact, and we only change how files in the root 
path. For example, in the example above, we will only change file names in the 
file names in blue color. Instead of naming the files _<batchID>.zip_ or 
{_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or 
{_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID, 
there could be multiple files. Here is an example:

20_d8e2ca47.zip

21_ef6618c2.delta

21_f4d05ac9.delta

22_4489578d.delta

23_689aa6bd.delta

The randomID will be generated by the executor itself. In the first version, it 
is just an UUID, which can be shortened later if needed. In the Appendix, we 
will discuss a design decision on how to generate these random IDs. These 
random numbers will be persistent in commit logs. When a new batch starts, 
these randomIDs will be passed to executors as a part of the operator 
executors. By the end of the batch, the IDs for the new checkpoints will be 
passed to the driver through an accumulator, where drivers can persist to the 
commit logs.

 

The lineage information is always managed in commit logs, but the lineage is 
often needed in state store level when they download checkpoint or do cleanup. 
To make a state store self-contained, some lineage information is also stored 
in the .zip and .delta files themselves, so that in the level of a single state 
store, we can always finish all the operators without relying on outside 
information. This decision will be discussed in the appendix. Each delta file 
will contain a unique checkpoint ID since all (presumed) snapshot checkpoints, 
and all changelog checkpoints following it. Will discuss it in more detail.

 

There are still extra complexities to this problem and we will discuss those 
issues in the following sections.
1. Recovery Case. The driver will tell executors which version to use, but how 
do executors find old snapshots and deltas to use before this version? 
 # Cleanup old versions.

h3. Commit Log Format Change

We will add a new field to commit message {_}CommitMetadata{_}, and add a 
field, which is a {_}Map[Map[Seq[String]]]{_}, which represents 
operatorID→storeName→partitionID→checkpointUniqueID. 
h3. Checkpoint File Format Change

In both changelog file and zip file, extra information on uniqueIDs since the 
presumed last snapshot will be written. It is a presumed snapshot because 
sometimes snapshotting can fail or be delayed so what in place of the presumed 
last snapshot is only a changelog. In those cases, we can still reconstruct 
further lineage by reading from that changelog file from the presumed snapshot. 
By keeping reading lineage from previous snapshots, the whole lineage can 
always be re-constructed.

 

When we need to download a checkpoint, we need to find the latest snapshot 
checkpoint and all subsequence changelog files. Usually, this can be done by 
only reading the delta file corresponding to the target unique checkpoint ID. 
In the cases of snapshot uploading failure, we will need to keep reading more 
than one delta file.

 

In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_}, which 
contains a list of unique_IDs. These IDs represent unique IDs for version-1, 
version-2, etc.

 

In the zip files, this list will be added to class 
{_}RocksDBCheckpointMetadata{_}.

 

The information should always be available, as
 # When initially loading a state store, we always have lineage information 
loaded since the last snapshot.
 # When we add a new changelog checkpoint, we add one checkpoint ID to the list 
and it is still the full list.
 # We can prune the list saved in memory based on presumed snapshot versions.

 

An alternative to change checkpoint file format is for the state store to read 
commit log files to get the lineage, which has shortcomes: 1. it has to read 
multiple commit logs which can be slow; 2. State store checkpoint directory 
won’t be self contained and the state store has to understand the commit log. 
h3. Passing UniqueID from Driver to Executors

We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID 
to use. The field will be an array of strings. Each string is corresponding to 
a uniqueID for one partition. This struct is already serialized to the 
executors.
h3. Passing UniqueID to Driver

The uniqueID will be passed back in a similar approach as 
{_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.  
_StatefulOperator_ will have an optional accumulator, with which we receive all 
the uniqueIDs from all tasks. In some cases, we may receive more than one 
uniqueID for one partition. The driver can simply pick any of them as the one 
to commit. Note that it never happens when the executors pass Marlin’s end 
offset back, as only one attempt will be made. In microbatch mode, some  of the 
IDs might be from failed tasks, and we might pick it. It should still be 
correct as even if the task fails, the checkpoint should still be valid to use. 
One potential problem is that the checkpoint we use may not match the output 
used in the sink. It is not a problem for now, as it happens today anyway 
without the problem. We can revisit the decision if we see such a problem in 
the future.

 

We will pass back not just a uniqueID for the new checkpoint based on V, but 
also the unique ID used for V-1. This will help the driver to validate that 
lineage is as expected. 
h3. State Store Checkpointing

When state store checkpointing starts, a unique ID (UUID) is generated, and 
stored in the state store provider. This ID is used to construct delta and zip 
file names for the checkpoint. The checkpoint procedure is mostly the same as 
today. The only thing is that we need to preserve the unique ID and the lineage 
information and make sure we use the correct one. Since the snapshot checkpoint 
is done asynchronously, we need to make an appropriate copy to them to make 
sure they are consistent with the snapshot.

 

Checkpointing would look like following:
20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)

23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

Assuming a snapshot checkpointing is scheduled too, so later snapshotting will 
succeed and the snapshot file will show up:
20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)

23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)

It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be 
uploaded. In this case, we will stay with a delta file only and continue from 
there.

The following delta files may only contain lineage up to Version, but we can 
further trace back to version 20 by reading 
{_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

24_32e3cc2a.delta (lineage in file: 689aa6bd)

It is possible that we have two executors doing a full snapshot. The one picked 
up for lineage failed but the other one succeeded. In an example, (Version=23, 
ID=8205c96f) got a full snapshot checkpoint. It will generate a result like 
this:

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

24_32e3cc2a.delta (lineage in file: 689aa6bd)

But since 23_8205c96f is never referenced by lineage in either commit log or 
checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version 
23, 23_689aa6bd.delta still is used.

 

Note that there is a chance where the same executor executed the same partition 
of the same batchID twice and both were successful. This is common in the case 
of ForEachBatch. We need to make sure we name the snapshot correctly. Here is 
an example:
 # Executed version 23, picked uniqueID 689aa6bd and successfully uploaded file 
23_689aa6bd.delta. 
 # Make RocksDB checkpoint Ckp1.
 # Return to the driver, and 23_689aa6bd is chosen to be committed.
 # Executed version 23, picked uniqueID 8205c96f and successfully uploaded file 
23_8205c96f.delta. 
 # Make RocksDB checkpoint Ckp2.
 # Maintenance thread wakes up for snapshot checkpoints.

The maintenance thread needs to upload two snapshots. CKp1 →  23_689aa6bd.zip, 
as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded as it doesn’t know 
which one is the one to be committed to the commit log. If it always uploads 
one, there is a chance that it always picks up the wrong one and we don’t have 
snapshot checkpointing for a long time, or ever.

One potential optimization is for the snapshot to wait a little bit for the 
next batch to start, so that it knows it is 23_689aa6bd that is picked up, so 
that it can only upload 23_689aa6bd.zip and skip  23_8205c96f.delta. If the 
next batch doesn’t start timely or ever, it will upload both.
h3. State Store Load

As normal cases, most of the time, executors already have the open state store 
for the one to load. The executor just needs to validate the open state store 
is at the checkpoint ID that matches the one sent from the driver. If the ID 
matches, we already have the state store loaded. If not, we need to reload from 
the cloud storage.

 

When we load a checkpoint from the state store, we first construct checkpoint 
names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}. 
If the former file exists, we just load it. Otherwise, we load the former name, 
read the first record, which is supposed to be the lineage metadata. Using the 
metadata, we can download the last snapshot version and apply subsequent delta 
files.

 

{*}Example 1{*}. The uniqueID has a snapshot.

In this example, we are trying to load (version=23, uniqueID=689aa6bd). 

20_d8e2ca47.delta

20_d8e2ca47.zip

21_ef6618c2.delta

21_f4d05ac9.delta

22_4489578d.delta

23_689aa6bd.delta

23_689aa6bd.zip

We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.

 

{*}Example 2{*}. The uniqueID only contains a delta log file.

If we still load (version=23, uiqueID=689aa6bd), but there is only a delta file:
20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

case, we will read lineage from 23_689aa6bd.delta and apply these files: 
20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The 
outcome is the same even if there is snapshot file for the same version but 
with different checkpoint:

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.

 

{*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.

In the case where snapshot uploading fails, we will need to continue tracing 
the lineage. In following example,

20_d8e2ca47.delta (lineage in file: …)

20_d8e2ca47.zip (lineage in file: …)

21_ef6618c2.delta (lineage in file: d8e2ca47)

21_f4d05ac9.delta (lineage in file: d8e2ca47)

22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)

23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)

24_32e3cc2a.delta (lineage in file: 689aa6bd)

When we are loading (version=24, ID=32e3cc2a), we only have lineage up to 
23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will 
open 23_689aa6bd.delta, and trace back files. Eventually, we will need to apply 
following files: 20_d8e2ca47.zip,21_f4d05ac9.delta, 
22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.

Note that, even if snapshot file shows up for a uniqueID, we will ignored them, 
and use the same 
h3. Compatibility

The new format will not be consumed by previous releases. So the feature will 
be turned off by default for microbatch mode. Later we will switch the default 
to be on.

 

Within the same release, we will allow users to switch between the V1 and V2. 
The driver will read the commit log and see whether uniqueID is available or 
not. If it is available, it is sent to the executors so that they can load 
checkpoints accordingly. Otherwise, it will assume it is V1. When switching 
mode, we will need to make sure that the first checkpoint is a full snapshot 
checkpoint synchronously. In this case, loading a checkpoint only needs to deal 
with files generated by either V1 or V2, not a mixture of the two.

 
h1. Appendix: Data Correctness Issue When Replaying Unmatches changelog files

Here is an example where users are not able to achieve what they want. Consider 
such a query:

……
.withColumn("random_key", (rand() * 1024).cast("int"))

.groupBy("random_key")

   .agg(collect_list($"payload").as("payload"))

……

All the query does is randomly distribute an entry to one of the 1024 buckets, 
and inside a bucket, entries are collected together. Consider such an execution:

Batch 1:
  Input: foo
  Task1 distributes it to key 6, so state store state: (6→ foo), we checkpoint 
it to 1.snapshot
  Task2 distributes it to key 8, so state store state: (8 →foo), but checkpoint 
finishes and the checkpoint is lost

 

Batch 2:
  Input: bar
  The only task starts with what is left over by Task2 (8→foo), and distributed 
the entry to key 6, so it has a delta state store update (6→bar), and 
successfully checkpoint to 2.delta

Query Restart:
  The query is restarted, and replay 1.snapshot + 2.delta. The outcome is 
(6→bar). “Foo” is lost.

 

The issue is more prominent with transformWithSate. Here is an example:

 

In transformWithState, the user writes the UDF to sample exactly 3 elements 
from the key group. And this can happen:
Batch n:

At Batch n Start, they have elements (A, B, C)

In Batch n, element D comes

Task 1 randomly drops A, checkpointing as snapshot (B, C, D); Task 2 randomly 
drops B, checkpointing as snapshot (A, C, D), Task 2's checkpointing wins, 
leaving (A, C, D) as the official checkpoint;

Batch n+1:

In Batch n+1, element E comes.

Task 1 continue its processing against its local state store, dropping B, 
checkpointing as changelog (add E, remove B)

 

Query Restart:

Now if the query restarts and recovers from checkpoints, we would apply (A, C, 
D) + (add E, remove B). There is actually no B. Assuming the behavior is that 
we can do blind B removing, we will leave (A, C, D, E). The user has the UDF to 
sample 3 elements, and they got 4.


> RocksDB State Store Checkpoint Structure V2
> -------------------------------------------
>
>                 Key: SPARK-49374
>                 URL: https://issues.apache.org/jira/browse/SPARK-49374
>             Project: Spark
>          Issue Type: Epic
>          Components: Structured Streaming
>    Affects Versions: 4.0.0
>            Reporter: Siying Dong
>            Priority: Major
>         Attachments: image-2024-08-23-14-28-56-418.png, 
> image-2024-08-23-14-29-19-443.png, image-2024-08-23-14-29-59-165.png, 
> image-2024-08-23-14-30-22-474.png
>
>
> h2. Design Doc: 
> [https://docs.google.com/document/d/1uWRMbN927cRXhSm5oeV3pbwb6o73am4r1ckEJDhAHa0/edit?usp=sharing]
> Motivation
> We expect the new checkpoint structure would be beneficial by establishing 
> characteristics of linear dependency between batch versions. Right now, tasks 
> can be executed multiple times for the same batchID (for speculative 
> execution or rerunning in ForEachBatch), and there can be multiple parallel 
> lineages of state stores going on. For example, in one of the issue with 
> ForEachBatch showed this lineage, which triggered a RocksDB file uploading 
> bug:
> !image-2024-08-23-14-28-56-418.png!
> Although we fixed all the bugs, this complexity always makes the system prone 
> to bugs. This non-linear lineage also presents a correctness risk, when some 
> of the Version is changelogs. 
> !image-2024-08-23-14-29-19-443.png!
> In the same example, suppose Version 17 is a Snapshot checkpoint and Version 
> 18 is a changelog checkpoint. When we need to recover from a checkpoint, we 
> need to apply Version 17 and Version 18 together. However, Version 18 isn’t 
> generated on top of Version 17. This can happen either because Version 18 is 
> generated by a different worker from Version 17, or the same worker abandoned 
> Version 17, replay this batch and generated Version 17’. In most cases, it is 
> accurate, but we have identified some edge cases where users might be 
> surprised by the results, and there may already be correctness issues. These 
> correctness issues will become more prominent with transformWithState due to 
> the occurrence of partial updates. (See Appendix for examples). This issue is 
> not specific to RocksDB State Store but is a general state store problem, 
> although the scope of the project is only limited to RocksDB State Store. 
> Note that the fixing state store lineage only makes sure the state store is 
> consistent, and doesn’t make sure the state store is consistent with outputs 
> to the sink.
> Furthermore, the complex checkpoint version lineage makes it hard to reduce 
> overhead for old version cleanup. Even a long running state store has to read 
> metadata for all previous versions and list all files to do any cleanup 
> safely, which is expensive. It is necessary because any version can be 
> written by a different executor and references to RocksDB files that the 
> current executor isn’t aware of. In extreme cases, we can even corrupt the 
> state store. The chance that it happens is very low, but it’s not a good idea 
> to leave unknown correctness risk unfixed.
> h2. Proposal sketch
> The proposed checkpoint structure will ensure a linear dependency:
> !image-2024-08-23-14-29-59-165.png!
> This stronger guarantee will be a good foundation for the problems above.
>  
> The proposal’s basic idea is to guarantee linear lineage by not allowing 
> checkpoint overwriting. All checkpoints are made with a new file name with a 
> uniqueID. When starting any batch, a task precisely identifies which 
> checkpoint to load with the uniqueID.
> When a new state store checkpoint is generated, the checkpoint path name 
> includes a globally unique ID, so that it can never be updated. Here is an 
> example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> The name is stored in the commit log too. When the next batch is being 
> executed, those unique IDs are passed to the executors, where they make sure 
> they start to execute from this checkpoint. If the local state store isn’t in 
> this state, it will download the checkpoint from the cloud.
> h1. Part II: Detailed design
> h2. Architecture
> Previously, a state store checkpoint was stored in a path that was only 
> determined by checkpoint root path, operatorID, partitionID and batchID. When 
> a stateful operator is executed, it is always able to construct the path with 
> that information. It will download the checkpoint path from the path of the 
> previous batchID and checkpoint to the path for this batchID. As mentioned 
> earlier, this flexibility comes with a cost: there is no way to distinguish 
> among checkpoints generated by different tasks rerunning for the same batchID.
>  
> In this new design, every checkpoint will go to a globally unique file. The 
> globally unique ID needs to be communicated between driver and the executors, 
> and stored in commit logs. The basic workflow is shown as following:
> !image-2024-08-23-14-30-22-474.png!
> h3. File Structure Under Checkpoint Path
> Currently, a checkpoint is stored in path 
> _<checkpoint_root>/<operatorID>/<partitionID>/<storeName>/<batchId>.[changelog,
>  zip]._ The path structure will look like following:
>  __ 0 (operator ID)
>     +----+
>      | 0 (partitionID)
>      +-----+
>      |     ……
>      | 1 (partitionID)
>      +-----+
>      |          |- default (storeName)
>      |         +-----+
>      |                     |  20.zip
>      |                     |  21.delta
>      |                     |  22.delta
>      |                     +  23.delta
>      | 2 (partitionID)
>     +--- ……
>  
> The general structure will be intact, and we only change how files in the 
> root path. For example, in the example above, we will only change file names 
> in the file names in blue color. Instead of naming the files _<batchID>.zip_ 
> or {_}<batchID>.delta{_}, we will name it {_}<batchID>{_}<uniqueID>.zip_ or 
> {_}<batchID>{_}<uniqueID>.delta\{_}. It also means that for the same batchID, 
> there could be multiple files. Here is an example:
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> The randomID will be generated by the executor itself. In the first version, 
> it is just an UUID, which can be shortened later if needed. In the Appendix, 
> we will discuss a design decision on how to generate these random IDs. These 
> random numbers will be persistent in commit logs. When a new batch starts, 
> these randomIDs will be passed to executors as a part of the operator 
> executors. By the end of the batch, the IDs for the new checkpoints will be 
> passed to the driver through an accumulator, where drivers can persist to the 
> commit logs.
>  
> The lineage information is always managed in commit logs, but the lineage is 
> often needed in state store level when they download checkpoint or do 
> cleanup. To make a state store self-contained, some lineage information is 
> also stored in the .zip and .delta files themselves, so that in the level of 
> a single state store, we can always finish all the operators without relying 
> on outside information. This decision will be discussed in the appendix. Each 
> delta file will contain a unique checkpoint ID since all (presumed) snapshot 
> checkpoints, and all changelog checkpoints following it. Will discuss it in 
> more detail.
>  
> There are still extra complexities to this problem and we will discuss those 
> issues in the following sections.
> 1. Recovery Case. The driver will tell executors which version to use, but 
> how do executors find old snapshots and deltas to use before this version? 
>  # Cleanup old versions.
> h3. Commit Log Format Change
> We will add a new field to commit message {_}CommitMetadata{_}, and add a 
> field, which is a {_}Map[Map[Seq[String]]]{_}, which represents 
> operatorID→storeName→partitionID→checkpointUniqueID. 
> h3. Checkpoint File Format Change
> In both changelog file and zip file, extra information on uniqueIDs since the 
> presumed last snapshot will be written. It is a presumed snapshot because 
> sometimes snapshotting can fail or be delayed so what in place of the 
> presumed last snapshot is only a changelog. In those cases, we can still 
> reconstruct further lineage by reading from that changelog file from the 
> presumed snapshot. By keeping reading lineage from previous snapshots, the 
> whole lineage can always be re-constructed.
>  
> When we need to download a checkpoint, we need to find the latest snapshot 
> checkpoint and all subsequence changelog files. Usually, this can be done by 
> only reading the delta file corresponding to the target unique checkpoint ID. 
> In the cases of snapshot uploading failure, we will need to keep reading more 
> than one delta file.
>  
> In the delta files, we will create a new entry type {_}LINEAGE_RECORD{_}, 
> which contains a list of unique_IDs. These IDs represent unique IDs for 
> version-1, version-2, etc.
>  
> In the zip files, this list will be added to class 
> {_}RocksDBCheckpointMetadata{_}.
>  
> The information should always be available, as
>  # When initially loading a state store, we always have lineage information 
> loaded since the last snapshot.
>  # When we add a new changelog checkpoint, we add one checkpoint ID to the 
> list and it is still the full list.
>  # We can prune the list saved in memory based on presumed snapshot versions.
>  
> An alternative to change checkpoint file format is for the state store to 
> read commit log files to get the lineage, which has shortcomes: 1. it has to 
> read multiple commit logs which can be slow; 2. State store checkpoint 
> directory won’t be self contained and the state store has to understand the 
> commit log. 
> h3. Passing UniqueID from Driver to Executors
> We will add a field in class _StatefulOperatorStateInfo_ to indicate uniqueID 
> to use. The field will be an array of strings. Each string is corresponding 
> to a uniqueID for one partition. This struct is already serialized to the 
> executors.
> h3. Passing UniqueID to Driver
> The uniqueID will be passed back in a similar approach as 
> {_}EventTimeWatermarkExec{_}, as well as Marlin’s end offset.  
> _StatefulOperator_ will have an optional accumulator, with which we receive 
> all the uniqueIDs from all tasks. In some cases, we may receive more than one 
> uniqueID for one partition. The driver can simply pick any of them as the one 
> to commit. Note that it never happens when the executors pass Marlin’s end 
> offset back, as only one attempt will be made. In microbatch mode, some  of 
> the IDs might be from failed tasks, and we might pick it. It should still be 
> correct as even if the task fails, the checkpoint should still be valid to 
> use. One potential problem is that the checkpoint we use may not match the 
> output used in the sink. It is not a problem for now, as it happens today 
> anyway without the problem. We can revisit the decision if we see such a 
> problem in the future.
>  
> We will pass back not just a uniqueID for the new checkpoint based on V, but 
> also the unique ID used for V-1. This will help the driver to validate that 
> lineage is as expected. 
> h3. State Store Checkpointing
> When state store checkpointing starts, a unique ID (UUID) is generated, and 
> stored in the state store provider. This ID is used to construct delta and 
> zip file names for the checkpoint. The checkpoint procedure is mostly the 
> same as today. The only thing is that we need to preserve the unique ID and 
> the lineage information and make sure we use the correct one. Since the 
> snapshot checkpoint is done asynchronously, we need to make an appropriate 
> copy to them to make sure they are consistent with the snapshot.
>  
> Checkpointing would look like following:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> Assuming a snapshot checkpointing is scheduled too, so later snapshotting 
> will succeed and the snapshot file will show up:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: d8e2ca47, f4d05ac9)
> 23_689aa6bd.delta (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> 23_689aa6bd.zip (lineage in file: d8e2ca47, f4d05ac9, 4489578d)
> It is possible that {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.zip\{_} fails to be 
> uploaded. In this case, we will stay with a delta file only and continue from 
> there.
> The following delta files may only contain lineage up to Version, but we can 
> further trace back to version 20 by reading 
> {_}23{_}{_}{{_}}689aa6bd{_}{{_}}.delta\{_}:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> It is possible that we have two executors doing a full snapshot. The one 
> picked up for lineage failed but the other one succeeded. In an example, 
> (Version=23, ID=8205c96f) got a full snapshot checkpoint. It will generate a 
> result like this:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> But since 23_8205c96f is never referenced by lineage in either commit log or 
> checkpoint files, such as 24_32e3cc2a, they will be ignored, and for version 
> 23, 23_689aa6bd.delta still is used.
>  
> Note that there is a chance where the same executor executed the same 
> partition of the same batchID twice and both were successful. This is common 
> in the case of ForEachBatch. We need to make sure we name the snapshot 
> correctly. Here is an example:
>  # Executed version 23, picked uniqueID 689aa6bd and successfully uploaded 
> file 23_689aa6bd.delta. 
>  # Make RocksDB checkpoint Ckp1.
>  # Return to the driver, and 23_689aa6bd is chosen to be committed.
>  # Executed version 23, picked uniqueID 8205c96f and successfully uploaded 
> file 23_8205c96f.delta. 
>  # Make RocksDB checkpoint Ckp2.
>  # Maintenance thread wakes up for snapshot checkpoints.
> The maintenance thread needs to upload two snapshots. CKp1 →  
> 23_689aa6bd.zip, as well as Ckp2 → 23_8205c96f.zip. Both need to be uploaded 
> as it doesn’t know which one is the one to be committed to the commit log. If 
> it always uploads one, there is a chance that it always picks up the wrong 
> one and we don’t have snapshot checkpointing for a long time, or ever.
> One potential optimization is for the snapshot to wait a little bit for the 
> next batch to start, so that it knows it is 23_689aa6bd that is picked up, so 
> that it can only upload 23_689aa6bd.zip and skip  23_8205c96f.delta. If the 
> next batch doesn’t start timely or ever, it will upload both.
> h3. State Store Load
> As normal cases, most of the time, executors already have the open state 
> store for the one to load. The executor just needs to validate the open state 
> store is at the checkpoint ID that matches the one sent from the driver. If 
> the ID matches, we already have the state store loaded. If not, we need to 
> reload from the cloud storage.
>  
> When we load a checkpoint from the state store, we first construct checkpoint 
> names {_}<batchID>{_}<uniqueID>.zip_ and {_}<batchID>{_}<uniqueID>.delta\{_}. 
> If the former file exists, we just load it. Otherwise, we load the former 
> name, read the first record, which is supposed to be the lineage metadata. 
> Using the metadata, we can download the last snapshot version and apply 
> subsequent delta files.
>  
> {*}Example 1{*}. The uniqueID has a snapshot.
> In this example, we are trying to load (version=23, uniqueID=689aa6bd). 
> 20_d8e2ca47.delta
> 20_d8e2ca47.zip
> 21_ef6618c2.delta
> 21_f4d05ac9.delta
> 22_4489578d.delta
> 23_689aa6bd.delta
> 23_689aa6bd.zip
> We see there is a file {_}23_689aa6bd.zip{_}, so we just use the file.
>  
> {*}Example 2{*}. The uniqueID only contains a delta log file.
> If we still load (version=23, uiqueID=689aa6bd), but there is only a delta 
> file:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> case, we will read lineage from 23_689aa6bd.delta and apply these files: 
> 20_d8e2ca47.zip,21_f4d05ac9.delta, 22_4489578d.delta,23_689aa6bd.delta. The 
> outcome is the same even if there is snapshot file for the same version but 
> with different checkpoint:
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 23_8205c96f.zip (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> In this case, ** 23_8205c96f.zip and 23_8205c96f.delta are simply ignored.
>  
> {*}Example 3{*}. Can’t find snapshot file in presumed snapshot location.
> In the case where snapshot uploading fails, we will need to continue tracing 
> the lineage. In following example,
> 20_d8e2ca47.delta (lineage in file: …)
> 20_d8e2ca47.zip (lineage in file: …)
> 21_ef6618c2.delta (lineage in file: d8e2ca47)
> 21_f4d05ac9.delta (lineage in file: d8e2ca47)
> 22_4489578d.delta (lineage in file: f4d05ac9, d8e2ca47)
> 23_689aa6bd.delta (lineage in file: 4489578d, f4d05ac9, d8e2ca47)
> 24_32e3cc2a.delta (lineage in file: 689aa6bd)
> When we are loading (version=24, ID=32e3cc2a), we only have lineage up to 
> 23_689aa6bd. However, there is no file 23_689aa6bd.zip. In this case, we will 
> open 23_689aa6bd.delta, and trace back files. Eventually, we will need to 
> apply following files: 20_d8e2ca47.zip,21_f4d05ac9.delta, 
> 22_4489578d.delta,23_689aa6bd.delta,24_32e3cc2a.
> Note that, even if snapshot file shows up for a uniqueID, we will ignored 
> them, and use the same 
> h3. Compatibility
> The new format will not be consumed by previous releases. So the feature will 
> be turned off by default for microbatch mode. Later we will switch the 
> default to be on.
>  
> Within the same release, we will allow users to switch between the V1 and V2. 
> The driver will read the commit log and see whether uniqueID is available or 
> not. If it is available, it is sent to the executors so that they can load 
> checkpoints accordingly. Otherwise, it will assume it is V1. When switching 
> mode, we will need to make sure that the first checkpoint is a full snapshot 
> checkpoint synchronously. In this case, loading a checkpoint only needs to 
> deal with files generated by either V1 or V2, not a mixture of the two.
>  
> h1. Appendix: Data Correctness Issue When Replaying Unmatches changelog files
> Here is an example where users are not able to achieve what they want. 
> Consider such a query:
> ……
> .withColumn("random_key", (rand() * 1024).cast("int"))
> .groupBy("random_key")
>    .agg(collect_list($"payload").as("payload"))
> ……
> All the query does is randomly distribute an entry to one of the 1024 
> buckets, and inside a bucket, entries are collected together. Consider such 
> an execution:
> Batch 1:
>   Input: foo
>   Task1 distributes it to key 6, so state store state: (6→ foo), we 
> checkpoint it to 1.snapshot
>   Task2 distributes it to key 8, so state store state: (8 →foo), but 
> checkpoint finishes and the checkpoint is lost
>  
> Batch 2:
>   Input: bar
>   The only task starts with what is left over by Task2 (8→foo), and 
> distributed the entry to key 6, so it has a delta state store update (6→bar), 
> and successfully checkpoint to 2.delta
> Query Restart:
>   The query is restarted, and replay 1.snapshot + 2.delta. The outcome is 
> (6→bar). “Foo” is lost.
>  
> The issue is more prominent with transformWithSate. Here is an example:
>  
> In transformWithState, the user writes the UDF to sample exactly 3 elements 
> from the key group. And this can happen:
> Batch n:
> At Batch n Start, they have elements (A, B, C)
> In Batch n, element D comes
> Task 1 randomly drops A, checkpointing as snapshot (B, C, D); Task 2 randomly 
> drops B, checkpointing as snapshot (A, C, D), Task 2's checkpointing wins, 
> leaving (A, C, D) as the official checkpoint;
> Batch n+1:
> In Batch n+1, element E comes.
> Task 1 continue its processing against its local state store, dropping B, 
> checkpointing as changelog (add E, remove B)
>  
> Query Restart:
> Now if the query restarts and recovers from checkpoints, we would apply (A, 
> C, D) + (add E, remove B). There is actually no B. Assuming the behavior is 
> that we can do blind B removing, we will leave (A, C, D, E). The user has the 
> UDF to sample 3 elements, and they got 4.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to