This is an automated email from the ASF dual-hosted git repository.
gaborgsomogyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c89c41606fe [FLINK-37460][state] Make state processor API checkpoint
ID configurable
c89c41606fe is described below
commit c89c41606fe609208f10d8fc08999145b612c6a2
Author: Gabor Somogyi <[email protected]>
AuthorDate: Fri Mar 14 08:11:04 2025 +0100
[FLINK-37460][state] Make state processor API checkpoint ID configurable
---
.../flink/state/api/KeyedStateTransformation.java | 9 +++-
.../state/api/OneInputStateTransformation.java | 20 +++++---
.../flink/state/api/OperatorTransformation.java | 15 +++++-
.../apache/flink/state/api/SavepointReader.java | 10 +++-
.../apache/flink/state/api/SavepointWriter.java | 55 +++++++++++++++++++---
.../state/api/WindowedStateTransformation.java | 25 +++++++---
.../state/api/output/MergeOperatorStates.java | 7 ++-
.../flink/state/api/output/SnapshotUtils.java | 11 +++--
.../operators/BroadcastStateBootstrapOperator.java | 11 ++++-
.../operators/KeyedStateBootstrapOperator.java | 9 +++-
.../output/operators/StateBootstrapOperator.java | 9 +++-
.../operators/StateBootstrapWrapperOperator.java | 9 +++-
.../StateBootstrapWrapperOperatorFactory.java | 11 ++++-
.../api/runtime/metadata/SavepointMetadataV2.java | 8 ++++
.../flink/state/api/SavepointWriterITCase.java | 18 +++++--
.../output/KeyedStateBootstrapOperatorTest.java | 4 +-
.../flink/state/api/output/SnapshotUtilsTest.java | 2 +-
17 files changed, 188 insertions(+), 45 deletions(-)
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
index 68228b96e45..b088afed2ef 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/KeyedStateTransformation.java
@@ -44,6 +44,9 @@ public class KeyedStateTransformation<K, T> {
/** The data set containing the data to bootstrap the operator state with.
*/
private final DataStream<T> stream;
+ /** Checkpoint ID. */
+ private final long checkpointId;
+
/** Local max parallelism for the bootstrapped operator. */
private final OptionalInt operatorMaxParallelism;
@@ -55,10 +58,12 @@ public class KeyedStateTransformation<K, T> {
KeyedStateTransformation(
DataStream<T> stream,
+ long checkpointId,
OptionalInt operatorMaxParallelism,
KeySelector<T, K> keySelector,
TypeInformation<K> keyType) {
this.stream = stream;
+ this.checkpointId = checkpointId;
this.operatorMaxParallelism = operatorMaxParallelism;
this.keySelector = keySelector;
this.keyType = keyType;
@@ -80,7 +85,7 @@ public class KeyedStateTransformation<K, T> {
(timestamp, path) ->
SimpleOperatorFactory.of(
new KeyedStateBootstrapOperator<>(
- timestamp, path, processFunction));
+ checkpointId, timestamp, path,
processFunction));
return transform(factory);
}
@@ -112,6 +117,6 @@ public class KeyedStateTransformation<K, T> {
public <W extends Window> WindowedStateTransformation<T, K, W> window(
WindowAssigner<? super T, W> assigner) {
return new WindowedStateTransformation<>(
- stream, operatorMaxParallelism, keySelector, keyType,
assigner);
+ stream, checkpointId, operatorMaxParallelism, keySelector,
keyType, assigner);
}
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
index a1e1191a148..8cc21a16390 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OneInputStateTransformation.java
@@ -49,11 +49,15 @@ public class OneInputStateTransformation<T> {
/** The data stream containing the data to bootstrap the operator state
with. */
private final DataStream<T> stream;
+ /** Checkpoint ID. */
+ private final long checkpointId;
+
/** Local max parallelism for the bootstrapped operator. */
private OptionalInt operatorMaxParallelism = OptionalInt.empty();
- OneInputStateTransformation(DataStream<T> stream) {
+ OneInputStateTransformation(DataStream<T> stream, long checkpointId) {
this.stream = stream;
+ this.checkpointId = checkpointId;
}
/**
@@ -84,7 +88,8 @@ public class OneInputStateTransformation<T> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
SimpleOperatorFactory.of(
- new StateBootstrapOperator<>(timestamp, path,
processFunction));
+ new StateBootstrapOperator<>(
+ checkpointId, timestamp, path,
processFunction));
return transform(factory);
}
@@ -105,7 +110,7 @@ public class OneInputStateTransformation<T> {
(timestamp, path) ->
SimpleOperatorFactory.of(
new BroadcastStateBootstrapOperator<>(
- timestamp, path, processFunction));
+ checkpointId, timestamp, path,
processFunction));
return transform(factory);
}
@@ -133,7 +138,8 @@ public class OneInputStateTransformation<T> {
public <K> KeyedStateTransformation<K, T> keyBy(KeySelector<T, K>
keySelector) {
TypeInformation<K> keyType =
TypeExtractor.getKeySelectorTypes(keySelector,
stream.getType());
- return new KeyedStateTransformation<>(stream, operatorMaxParallelism,
keySelector, keyType);
+ return new KeyedStateTransformation<>(
+ stream, checkpointId, operatorMaxParallelism, keySelector,
keyType);
}
/**
@@ -146,7 +152,8 @@ public class OneInputStateTransformation<T> {
*/
public <K> KeyedStateTransformation<K, T> keyBy(
KeySelector<T, K> keySelector, TypeInformation<K> keyType) {
- return new KeyedStateTransformation<>(stream, operatorMaxParallelism,
keySelector, keyType);
+ return new KeyedStateTransformation<>(
+ stream, checkpointId, operatorMaxParallelism, keySelector,
keyType);
}
/**
@@ -186,6 +193,7 @@ public class OneInputStateTransformation<T> {
TypeInformation<Tuple> keyType =
TypeExtractor.getKeySelectorTypes(keySelector,
stream.getType());
- return new KeyedStateTransformation<>(stream, operatorMaxParallelism,
keySelector, keyType);
+ return new KeyedStateTransformation<>(
+ stream, checkpointId, operatorMaxParallelism, keySelector,
keyType);
}
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
index cca55a09321..29e8ea2cb16 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/OperatorTransformation.java
@@ -64,6 +64,19 @@ public final class OperatorTransformation {
* @return A {@link OneInputStateTransformation}.
*/
public static <T> OneInputStateTransformation<T>
bootstrapWith(DataStream<T> stream) {
- return new OneInputStateTransformation<>(stream);
+ return new OneInputStateTransformation<>(stream, 0L);
+ }
+
+ /**
+ * Create a new {@link OneInputStateTransformation} from a {@link
DataStream}.
+ *
+ * @param stream A data stream of elements.
+ * @param checkpointId checkpoint ID.
+ * @param <T> The type of the input.
+ * @return A {@link OneInputStateTransformation}.
+ */
+ public static <T> OneInputStateTransformation<T> bootstrapWith(
+ DataStream<T> stream, long checkpointId) {
+ return new OneInputStateTransformation<>(stream, checkpointId);
}
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
index ec4850e8f24..7c6bdf527d0 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointReader.java
@@ -82,7 +82,10 @@ public class SavepointReader {
SavepointMetadataV2 savepointMetadata =
new SavepointMetadataV2(
- maxParallelism, metadata.getMasterStates(),
metadata.getOperatorStates());
+ metadata.getCheckpointId(),
+ maxParallelism,
+ metadata.getMasterStates(),
+ metadata.getOperatorStates());
return new SavepointReader(env, savepointMetadata, null);
}
@@ -111,7 +114,10 @@ public class SavepointReader {
SavepointMetadataV2 savepointMetadata =
new SavepointMetadataV2(
- maxParallelism, metadata.getMasterStates(),
metadata.getOperatorStates());
+ metadata.getCheckpointId(),
+ maxParallelism,
+ metadata.getMasterStates(),
+ metadata.getOperatorStates());
return new SavepointReader(env, savepointMetadata, stateBackend);
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
index b8688da7169..8c9e32ecd1b 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/SavepointWriter.java
@@ -108,7 +108,10 @@ public class SavepointWriter {
"Savepoint must contain at
least one operator state."));
return new SavepointMetadataV2(
- maxParallelism, metadata.getMasterStates(),
metadata.getOperatorStates());
+ metadata.getCheckpointId(),
+ maxParallelism,
+ metadata.getMasterStates(),
+ metadata.getOperatorStates());
}
/**
@@ -123,13 +126,48 @@ public class SavepointWriter {
public static SavepointWriter newSavepoint(
StreamExecutionEnvironment executionEnvironment, int
maxParallelism) {
return new SavepointWriter(
- createSavepointMetadata(maxParallelism), null,
executionEnvironment);
+ createSavepointMetadata(0L, maxParallelism), null,
executionEnvironment);
+ }
+
+ /**
+ * Creates a new savepoint. The savepoint will be written using the state
backend defined via
+ * the clusters configuration.
+ *
+ * @param maxParallelism The max parallelism of the savepoint.
+ * @param checkpointId checkpoint ID.
+ * @return A {@link SavepointWriter}.
+ * @see #newSavepoint(StreamExecutionEnvironment, StateBackend, int)
+ * @see #withConfiguration(ConfigOption, Object)
+ */
+ public static SavepointWriter newSavepoint(
+ StreamExecutionEnvironment executionEnvironment,
+ long checkpointId,
+ int maxParallelism) {
+ return new SavepointWriter(
+ createSavepointMetadata(checkpointId, maxParallelism), null,
executionEnvironment);
+ }
+
+ /**
+ * Creates a new savepoint.
+ *
+ * @param stateBackend The state backend of the savepoint used for keyed
state.
+ * @param maxParallelism The max parallelism of the savepoint.
+ * @return A {@link SavepointWriter}.
+ * @see #newSavepoint(StreamExecutionEnvironment, int)
+ */
+ public static SavepointWriter newSavepoint(
+ StreamExecutionEnvironment executionEnvironment,
+ StateBackend stateBackend,
+ int maxParallelism) {
+ return new SavepointWriter(
+ createSavepointMetadata(0L, maxParallelism), stateBackend,
executionEnvironment);
}
/**
* Creates a new savepoint.
*
* @param stateBackend The state backend of the savepoint used for keyed
state.
+ * @param checkpointId checkpoint ID.
* @param maxParallelism The max parallelism of the savepoint.
* @return A {@link SavepointWriter}.
* @see #newSavepoint(StreamExecutionEnvironment, int)
@@ -137,12 +175,16 @@ public class SavepointWriter {
public static SavepointWriter newSavepoint(
StreamExecutionEnvironment executionEnvironment,
StateBackend stateBackend,
+ long checkpointId,
int maxParallelism) {
return new SavepointWriter(
- createSavepointMetadata(maxParallelism), stateBackend,
executionEnvironment);
+ createSavepointMetadata(checkpointId, maxParallelism),
+ stateBackend,
+ executionEnvironment);
}
- private static SavepointMetadataV2 createSavepointMetadata(int
maxParallelism) {
+ private static SavepointMetadataV2 createSavepointMetadata(
+ long checkpointId, int maxParallelism) {
Preconditions.checkArgument(
maxParallelism > 0 && maxParallelism <=
UPPER_BOUND_MAX_PARALLELISM,
"Maximum parallelism must be between 1 and "
@@ -151,7 +193,7 @@ public class SavepointWriter {
+ maxParallelism);
return new SavepointMetadataV2(
- maxParallelism, Collections.emptyList(),
Collections.emptyList());
+ checkpointId, maxParallelism, Collections.emptyList(),
Collections.emptyList());
}
/**
@@ -295,7 +337,8 @@ public class SavepointWriter {
"reduce(OperatorState)",
TypeInformation.of(CheckpointMetadata.class),
new GroupReduceOperator<>(
- new
MergeOperatorStates(metadata.getMasterStates())))
+ new MergeOperatorStates(
+ metadata.getCheckpointId(),
metadata.getMasterStates())))
.forceNonParallel()
.map(new
CheckpointMetadataCheckpointMetadataMapFunction(this.uidTransformationMap))
.setParallelism(1)
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
index d239d49a741..60e32b7babb 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/WindowedStateTransformation.java
@@ -57,6 +57,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
private final WindowOperatorBuilder<T, K, W> builder;
+ private final long checkpointId;
+
private final OptionalInt operatorMaxParallelism;
private final KeySelector<T, K> keySelector;
@@ -65,11 +67,13 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
WindowedStateTransformation(
DataStream<T> input,
+ long checkpointId,
OptionalInt operatorMaxParallelism,
KeySelector<T, K> keySelector,
TypeInformation<K> keyType,
WindowAssigner<? super T, W> windowAssigner) {
this.input = input;
+ this.checkpointId = checkpointId;
this.operatorMaxParallelism = operatorMaxParallelism;
this.keySelector = keySelector;
this.keyType = keyType;
@@ -158,7 +162,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
@@ -186,7 +191,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
@@ -323,7 +329,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
@@ -402,7 +409,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
@@ -428,7 +436,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
@@ -454,7 +463,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
@@ -477,7 +487,8 @@ public class WindowedStateTransformation<T, K, W extends
Window> {
SavepointWriterOperatorFactory factory =
(timestamp, path) ->
- new StateBootstrapWrapperOperatorFactory<>(timestamp,
path, operator);
+ new StateBootstrapWrapperOperatorFactory<>(
+ checkpointId, timestamp, path, operator);
return new StateBootstrapTransformation<>(
input, operatorMaxParallelism, factory, keySelector, keyType);
}
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
index f00bfa34943..584ae131fb7 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/MergeOperatorStates.java
@@ -39,11 +39,14 @@ public class MergeOperatorStates implements
GroupReduceFunction<OperatorState, C
private static final long serialVersionUID = 1L;
+ private final long checkpointId;
+
private final Collection<MasterState> masterStates;
- public MergeOperatorStates(Collection<MasterState> masterStates) {
+ public MergeOperatorStates(long checkpointId, Collection<MasterState>
masterStates) {
Preconditions.checkNotNull(masterStates, "Master state metadata must
not be null");
+ this.checkpointId = checkpointId;
this.masterStates = masterStates;
}
@@ -51,7 +54,7 @@ public class MergeOperatorStates implements
GroupReduceFunction<OperatorState, C
public void reduce(Iterable<OperatorState> values,
Collector<CheckpointMetadata> out) {
CheckpointMetadata metadata =
new CheckpointMetadata(
- SnapshotUtils.CHECKPOINT_ID,
+ checkpointId,
StreamSupport.stream(values.spliterator(), false)
.collect(Collectors.toList()),
masterStates);
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
index b4b6e5dbd6a..2c7664234ab 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/SnapshotUtils.java
@@ -41,11 +41,10 @@ import static
org.apache.flink.configuration.CheckpointingOptions.FS_WRITE_BUFFE
/** Takes a final snapshot of the state of an operator subtask. */
@Internal
public final class SnapshotUtils {
- static final long CHECKPOINT_ID = 0L;
-
private SnapshotUtils() {}
public static <OUT, OP extends StreamOperator<OUT>>
TaggedOperatorSubtaskState snapshot(
+ long checkpointId,
OP operator,
int index,
long timestamp,
@@ -64,21 +63,22 @@ public final class SnapshotUtils {
isUnalignedCheckpoint,
CheckpointOptions.NO_ALIGNED_CHECKPOINT_TIME_OUT);
- operator.prepareSnapshotPreBarrier(CHECKPOINT_ID);
+ operator.prepareSnapshotPreBarrier(checkpointId);
CheckpointStreamFactory storage = createStreamFactory(configuration,
options);
OperatorSnapshotFutures snapshotInProgress =
- operator.snapshotState(CHECKPOINT_ID, timestamp, options,
storage);
+ operator.snapshotState(checkpointId, timestamp, options,
storage);
OperatorSubtaskState state =
new
OperatorSnapshotFinalizer(snapshotInProgress).getJobManagerOwnedState();
- operator.notifyCheckpointComplete(CHECKPOINT_ID);
+ operator.notifyCheckpointComplete(checkpointId);
return new TaggedOperatorSubtaskState(index, state);
}
public static <OUT, OP extends StreamOperator<OUT>>
TaggedOperatorSubtaskState snapshot(
+ long checkpointId,
OP operator,
int index,
long timestamp,
@@ -89,6 +89,7 @@ public final class SnapshotUtils {
throws Exception {
return snapshot(
+ checkpointId,
operator,
index,
timestamp,
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
index 04aee7899c6..8dfb6d3e124 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/BroadcastStateBootstrapOperator.java
@@ -44,6 +44,8 @@ public class BroadcastStateBootstrapOperator<IN>
private static final long serialVersionUID = 1L;
+ private final long checkpointId;
+
private final long timestamp;
private final Path savepointPath;
@@ -51,10 +53,14 @@ public class BroadcastStateBootstrapOperator<IN>
private transient ContextImpl context;
public BroadcastStateBootstrapOperator(
- long timestamp, Path savepointPath,
BroadcastStateBootstrapFunction<IN> function) {
+ long checkpointId,
+ long timestamp,
+ Path savepointPath,
+ BroadcastStateBootstrapFunction<IN> function) {
super(function);
- this.timestamp = timestamp;
+ this.checkpointId = checkpointId;
+ this.timestamp = timestamp;
this.savepointPath = savepointPath;
}
@@ -73,6 +79,7 @@ public class BroadcastStateBootstrapOperator<IN>
public void endInput() throws Exception {
TaggedOperatorSubtaskState state =
SnapshotUtils.snapshot(
+ checkpointId,
this,
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
timestamp,
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
index b3be5b988d6..6f4b4a1c223 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/KeyedStateBootstrapOperator.java
@@ -48,6 +48,8 @@ public class KeyedStateBootstrapOperator<K, IN>
private static final long serialVersionUID = 1L;
+ private final long checkpointId;
+
private final long timestamp;
private final Path savepointPath;
@@ -55,9 +57,13 @@ public class KeyedStateBootstrapOperator<K, IN>
private transient KeyedStateBootstrapOperator<K, IN>.ContextImpl context;
public KeyedStateBootstrapOperator(
- long timestamp, Path savepointPath, KeyedStateBootstrapFunction<K,
IN> function) {
+ long checkpointId,
+ long timestamp,
+ Path savepointPath,
+ KeyedStateBootstrapFunction<K, IN> function) {
super(function);
+ this.checkpointId = checkpointId;
this.timestamp = timestamp;
this.savepointPath = savepointPath;
}
@@ -88,6 +94,7 @@ public class KeyedStateBootstrapOperator<K, IN>
public void endInput() throws Exception {
TaggedOperatorSubtaskState state =
SnapshotUtils.snapshot(
+ checkpointId,
this,
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
timestamp,
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
index 4ae00d8e2d4..149aa938b02 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapOperator.java
@@ -40,6 +40,8 @@ public class StateBootstrapOperator<IN>
private static final long serialVersionUID = 1L;
+ private final long checkpointId;
+
private final long timestamp;
private final Path savepointPath;
@@ -47,9 +49,13 @@ public class StateBootstrapOperator<IN>
private transient ContextImpl context;
public StateBootstrapOperator(
- long timestamp, Path savepointPath, StateBootstrapFunction<IN>
function) {
+ long checkpointId,
+ long timestamp,
+ Path savepointPath,
+ StateBootstrapFunction<IN> function) {
super(function);
+ this.checkpointId = checkpointId;
this.timestamp = timestamp;
this.savepointPath = savepointPath;
}
@@ -69,6 +75,7 @@ public class StateBootstrapOperator<IN>
public void endInput() throws Exception {
TaggedOperatorSubtaskState state =
SnapshotUtils.snapshot(
+ checkpointId,
this,
getRuntimeContext().getTaskInfo().getIndexOfThisSubtask(),
timestamp,
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
index 7b91608b93a..4170c8f66f6 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperator.java
@@ -53,6 +53,8 @@ public final class StateBootstrapWrapperOperator<
private static final long serialVersionUID = 1L;
+ private final long checkpointId;
+
private final long timestamp;
private final Path savepointPath;
@@ -62,7 +64,11 @@ public final class StateBootstrapWrapperOperator<
private final WindowOperator<?, IN, ?, ?, ?> operator;
public StateBootstrapWrapperOperator(
- long timestamp, Path savepointPath, WindowOperator<?, IN, ?, ?, ?>
operator) {
+ long checkpointId,
+ long timestamp,
+ Path savepointPath,
+ WindowOperator<?, IN, ?, ?, ?> operator) {
+ this.checkpointId = checkpointId;
this.timestamp = timestamp;
this.savepointPath = savepointPath;
this.operator = operator;
@@ -172,6 +178,7 @@ public final class StateBootstrapWrapperOperator<
public void endInput() throws Exception {
TaggedOperatorSubtaskState state =
SnapshotUtils.snapshot(
+ checkpointId,
this,
operator.getContainingTask()
.getEnvironment()
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
index 2de1ecf56e2..e6f9a50f1e4 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/operators/StateBootstrapWrapperOperatorFactory.java
@@ -47,6 +47,8 @@ public class StateBootstrapWrapperOperatorFactory<
OPF extends OneInputStreamOperatorFactory<IN, OUT>>
extends AbstractStreamOperatorFactory<TaggedOperatorSubtaskState> {
+ private final long checkpointId;
+
private final long timestamp;
private final Path savepointPath;
@@ -56,7 +58,11 @@ public class StateBootstrapWrapperOperatorFactory<
private final WindowOperator<?, IN, ?, ?, ?> operator;
public StateBootstrapWrapperOperatorFactory(
- long timestamp, Path savepointPath, WindowOperator<?, IN, ?, ?, ?>
operator) {
+ long checkpointId,
+ long timestamp,
+ Path savepointPath,
+ WindowOperator<?, IN, ?, ?, ?> operator) {
+ this.checkpointId = checkpointId;
this.timestamp = timestamp;
this.savepointPath = savepointPath;
this.operator = operator;
@@ -67,7 +73,8 @@ public class StateBootstrapWrapperOperatorFactory<
public <T extends StreamOperator<TaggedOperatorSubtaskState>> T
createStreamOperator(
StreamOperatorParameters<TaggedOperatorSubtaskState> parameters) {
StateBootstrapWrapperOperator<IN, OUT, OP> wrapperOperator =
- new StateBootstrapWrapperOperator<>(timestamp, savepointPath,
operator);
+ new StateBootstrapWrapperOperator<>(
+ checkpointId, timestamp, savepointPath, operator);
wrapperOperator.setup(
parameters.getContainingTask(),
parameters.getStreamConfig(),
diff --git
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
index ab771364434..90faf9541f1 100644
---
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
+++
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/runtime/metadata/SavepointMetadataV2.java
@@ -41,6 +41,8 @@ import static
org.apache.flink.runtime.state.KeyGroupRangeAssignment.UPPER_BOUND
@Internal
public class SavepointMetadataV2 {
+ private final long checkpointId;
+
private final int maxParallelism;
private final Collection<MasterState> masterStates;
@@ -48,6 +50,7 @@ public class SavepointMetadataV2 {
private final Map<OperatorID, OperatorStateSpecV2> operatorStateIndex;
public SavepointMetadataV2(
+ long checkpointId,
int maxParallelism,
Collection<MasterState> masterStates,
Collection<OperatorState> initialStates) {
@@ -59,6 +62,7 @@ public class SavepointMetadataV2 {
+ maxParallelism);
Preconditions.checkNotNull(masterStates);
+ this.checkpointId = checkpointId;
this.maxParallelism = maxParallelism;
this.masterStates = new ArrayList<>(masterStates);
this.operatorStateIndex =
CollectionUtil.newHashMapWithExpectedSize(initialStates.size());
@@ -70,6 +74,10 @@ public class SavepointMetadataV2 {
OperatorStateSpecV2.existing(existingState)));
}
+ public long getCheckpointId() {
+ return checkpointId;
+ }
+
public int getMaxParallelism() {
return maxParallelism;
}
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
index 87414cf66df..d7a5586b602 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/SavepointWriterITCase.java
@@ -30,6 +30,7 @@ import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.StateBackendOptions;
+import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
@@ -38,6 +39,7 @@ import
org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.state.api.functions.BroadcastStateBootstrapFunction;
import org.apache.flink.state.api.functions.KeyedStateBootstrapFunction;
import org.apache.flink.state.api.functions.StateBootstrapFunction;
+import org.apache.flink.state.api.runtime.SavepointLoader;
import org.apache.flink.state.rocksdb.EmbeddedRocksDBStateBackend;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
@@ -66,6 +68,8 @@ import static org.assertj.core.api.Assertions.assertThat;
/** IT test for writing savepoints. */
public class SavepointWriterITCase extends AbstractTestBaseJUnit4 {
+ private static final long CHECKPOINT_ID = 42;
+
private static final String ACCOUNT_UID = "accounts";
private static final String CURRENCY_UID = "currency";
@@ -120,18 +124,18 @@ public class SavepointWriterITCase extends
AbstractTestBaseJUnit4 {
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
StateBootstrapTransformation<Account> transformation =
- OperatorTransformation.bootstrapWith(env.fromData(accounts))
+ OperatorTransformation.bootstrapWith(env.fromData(accounts),
CHECKPOINT_ID)
.keyBy(acc -> acc.id)
.transform(new AccountBootstrapper());
StateBootstrapTransformation<CurrencyRate> broadcastTransformation =
-
OperatorTransformation.bootstrapWith(env.fromData(currencyRates))
+
OperatorTransformation.bootstrapWith(env.fromData(currencyRates), CHECKPOINT_ID)
.transform(new CurrencyBootstrapFunction());
SavepointWriter writer =
backend == null
- ? SavepointWriter.newSavepoint(env, 128)
- : SavepointWriter.newSavepoint(env, backend, 128);
+ ? SavepointWriter.newSavepoint(env, CHECKPOINT_ID, 128)
+ : SavepointWriter.newSavepoint(env, backend,
CHECKPOINT_ID, 128);
writer.withOperator(OperatorIdentifier.forUid(ACCOUNT_UID),
transformation)
.withOperator(getUidHashFromUid(CURRENCY_UID),
broadcastTransformation)
@@ -142,6 +146,9 @@ public class SavepointWriterITCase extends
AbstractTestBaseJUnit4 {
private void validateBootstrap(Configuration configuration, String
savepointPath)
throws Exception {
+ CheckpointMetadata metadata =
SavepointLoader.loadSavepointMetadata(savepointPath);
+ assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID);
+
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
@@ -192,6 +199,9 @@ public class SavepointWriterITCase extends
AbstractTestBaseJUnit4 {
private void validateModification(Configuration configuration, String
savepointPath)
throws Exception {
+ CheckpointMetadata metadata =
SavepointLoader.loadSavepointMetadata(savepointPath);
+ assertThat(metadata.getCheckpointId()).isEqualTo(CHECKPOINT_ID);
+
StreamExecutionEnvironment sEnv =
StreamExecutionEnvironment.getExecutionEnvironment(configuration);
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
index b3aaa093c27..b5b034a0762 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/KeyedStateBootstrapOperatorTest.java
@@ -62,7 +62,7 @@ public class KeyedStateBootstrapOperatorTest {
OperatorSubtaskState state;
KeyedStateBootstrapOperator<Long, Long> bootstrapOperator =
- new KeyedStateBootstrapOperator<>(0L, path, new
TimerBootstrapFunction());
+ new KeyedStateBootstrapOperator<>(0L, 0L, path, new
TimerBootstrapFunction());
try (KeyedOneInputStreamOperatorTestHarness<Long, Long,
TaggedOperatorSubtaskState>
harness = getHarness(bootstrapOperator)) {
processElements(harness, 1L, 2L, 3L);
@@ -93,7 +93,7 @@ public class KeyedStateBootstrapOperatorTest {
OperatorSubtaskState state;
KeyedStateBootstrapOperator<Long, Long> bootstrapOperator =
- new KeyedStateBootstrapOperator<>(0L, path, new
SimpleBootstrapFunction());
+ new KeyedStateBootstrapOperator<>(0L, 0L, path, new
SimpleBootstrapFunction());
try (KeyedOneInputStreamOperatorTestHarness<Long, Long,
TaggedOperatorSubtaskState>
harness = getHarness(bootstrapOperator)) {
processElements(harness, 1L, 2L, 3L);
diff --git
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
index 0582fcc9d9a..3cedf154c0d 100644
---
a/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
+++
b/flink-libraries/flink-state-processing-api/src/test/java/org/apache/flink/state/api/output/SnapshotUtilsTest.java
@@ -77,7 +77,7 @@ public class SnapshotUtilsTest {
Path path = new Path(folder.newFolder().getAbsolutePath());
SnapshotUtils.snapshot(
- operator, 0, 0L, true, false, new Configuration(), path,
savepointFormatType);
+ 0L, operator, 0, 0L, true, false, new Configuration(), path,
savepointFormatType);
Assert.assertEquals(SavepointType.savepoint(savepointFormatType),
actualSnapshotType);
Assert.assertEquals(EXPECTED_CALL_OPERATOR_SNAPSHOT,
ACTUAL_ORDER_TRACKING);