curcur commented on a change in pull request #15420:
URL: https://github.com/apache/flink/pull/15420#discussion_r654911814
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -96,10 +99,11 @@ public ChangelogStateBackend(StateBackend stateBackend) {
kvStateRegistry,
ttlTimeProvider,
metricGroup,
- stateHandles,
+ extractMaterializedState(stateHandles),
cancelStreamRegistry);
// todo: FLINK-21804 get from Environment.getTaskStateManager
InMemoryStateChangelogStorage changelogWriterFactory = new
InMemoryStateChangelogStorage();
+ // todo: apply state changes from non-materialized part of stateHandles
Review comment:
Should this be included as part of the recovery PR, or could you please
open a ticket for this.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableList;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A handle to ChangelogStateBackend state. Consists of the base and delta
parts. Base part
+ * references materialized state (e.g. SST files), while delta part references
state changes that
+ * were not not materialized at the time of the snapshot. Both are potentially
empty lists as there
+ * can be no state or multiple states (e.g. after rescaling).
+ */
+@Internal
+public interface ChangelogStateBackendHandle extends KeyedStateHandle {
+ List<KeyedStateHandle> getMaterializedStateHandles();
+
+ List<StateChangelogHandle> getNonMaterializedStateHandles();
Review comment:
May be a bit picky and not related to this PR,
Can we change "StateChangelogHandle" => "ChangelogStateHandle" to make it
consistent with other state handle.
WDYT?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -114,13 +128,33 @@
private final StateChangelogWriter<?> stateChangelogWriter;
+ private long lastCheckpointId = -1L;
+
/** last accessed partitioned state. */
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
/** For caching the last accessed partitioned state. */
private String lastName;
+ /** Updated initially on restore and later upon materialization (after
FLINK-21356). */
+ private final List<KeyedStateHandle> materialized = new ArrayList<>();
Review comment:
this should be changeable right?
remove final?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -245,8 +280,38 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
- return keyedStateBackend.snapshot(
- checkpointId, timestamp, streamFactory, checkpointOptions);
+ // The range to upload may overlap with the previous one(s). To reuse
them, we could store
+ // the previous results either here in the backend or in the writer.
However,
+ // materialization may truncate only a part of the previous result and
the backend would
+ // have to split it somehow for the former option, so the latter is
used.
Review comment:
discuss offline.
What is "Previous Results" mean here?
RunnableFuture or Handle or lastUpLoadSeq?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogStateBackend.java
##########
@@ -96,10 +99,11 @@ public ChangelogStateBackend(StateBackend stateBackend) {
kvStateRegistry,
ttlTimeProvider,
metricGroup,
- stateHandles,
+ extractMaterializedState(stateHandles),
cancelStreamRegistry);
// todo: FLINK-21804 get from Environment.getTaskStateManager
InMemoryStateChangelogStorage changelogWriterFactory = new
InMemoryStateChangelogStorage();
+ // todo: apply state changes from non-materialized part of stateHandles
Review comment:
Also, after recovery
The state of the changelog state backend includes two parts if I understand
correctly:
1). The delegated backend state, and
2). Changelog State
=====
1). is recovered here + TODO
2). What do we do for 2)? Do we need to recover at least the last seq no?
Maybe it is in the recovery part; Put a marker here.
If we do not, and always create a new log starting from 0; then how
materialized data is used to identify what part of the log has already been
materialized.
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -245,8 +280,38 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
- return keyedStateBackend.snapshot(
- checkpointId, timestamp, streamFactory, checkpointOptions);
+ // The range to upload may overlap with the previous one(s). To reuse
them, we could store
+ // the previous results either here in the backend or in the writer.
However,
+ // materialization may truncate only a part of the previous result and
the backend would
+ // have to split it somehow for the former option, so the latter is
used.
Review comment:
Truncation needs to consider shared counter to previous log reference as
well right?
I need to understand the relationship between
1. SharedRegistry in JM
2. List of Materialized and Non-Materalized in TM locally
3. upload from, to seq number
4. materialization truncation vs logs.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
##########
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.ExceptionUtils;
+
+import org.apache.flink.shaded.guava18.com.google.common.io.Closer;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
+
+import static java.util.Collections.unmodifiableList;
+import static org.apache.flink.util.Preconditions.checkArgument;
+
+/**
+ * A handle to ChangelogStateBackend state. Consists of the base and delta
parts. Base part
+ * references materialized state (e.g. SST files), while delta part references
state changes that
+ * were not not materialized at the time of the snapshot. Both are potentially
empty lists as there
+ * can be no state or multiple states (e.g. after rescaling).
+ */
+@Internal
+public interface ChangelogStateBackendHandle extends KeyedStateHandle {
+ List<KeyedStateHandle> getMaterializedStateHandles();
+
+ List<StateChangelogHandle> getNonMaterializedStateHandles();
Review comment:
Hey Roman, does order matter here? Put another way, would Set be enough
here?
I guess, these lists of KeyedStateHandle/StateChangelogHandle are state
handlers that reside in different key-group ranges without overlapping?
If that's the case, the order should not matter?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -245,8 +280,38 @@ public boolean
deregisterKeySelectionListener(KeySelectionListener<K> listener)
@Nonnull CheckpointStreamFactory streamFactory,
@Nonnull CheckpointOptions checkpointOptions)
throws Exception {
- return keyedStateBackend.snapshot(
- checkpointId, timestamp, streamFactory, checkpointOptions);
+ // The range to upload may overlap with the previous one(s). To reuse
them, we could store
+ // the previous results either here in the backend or in the writer.
However,
+ // materialization may truncate only a part of the previous result and
the backend would
+ // have to split it somehow for the former option, so the latter is
used.
+ lastCheckpointId = checkpointId;
+ lastUploadedFrom = materializedTo;
+ lastUploadedTo =
stateChangelogWriter.lastAppendedSequenceNumber().next();
+
+ LOG.debug(
+ "snapshot for checkpoint {}, change range: {}..{}",
+ checkpointId,
+ lastUploadedFrom,
+ lastUploadedTo);
+ return toRunnableFuture(
+ stateChangelogWriter
+ .persist(lastUploadedFrom)
Review comment:
This returns the Future of StateChangelogHandle that is from
lastUoloadedFrom till the most recent change
But "nonMaterialized" is not updated accordingly.
Would this have problems in buildSnapshotResult
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -114,13 +128,33 @@
private final StateChangelogWriter<?> stateChangelogWriter;
+ private long lastCheckpointId = -1L;
+
/** last accessed partitioned state. */
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
/** For caching the last accessed partitioned state. */
private String lastName;
+ /** Updated initially on restore and later upon materialization (after
FLINK-21356). */
+ private final List<KeyedStateHandle> materialized = new ArrayList<>();
+
+ /** Updated initially on restore and later cleared upon materialization
(after FLINK-21356). */
+ private final List<StateChangelogHandle> nonMaterialized = new
ArrayList<>();
Review comment:
this is not changeable, just to double confirm?
##########
File path:
flink-state-backends/flink-statebackend-changelog/src/main/java/org/apache/flink/state/changelog/ChangelogKeyedStateBackend.java
##########
@@ -114,13 +128,33 @@
private final StateChangelogWriter<?> stateChangelogWriter;
+ private long lastCheckpointId = -1L;
+
/** last accessed partitioned state. */
@SuppressWarnings("rawtypes")
private InternalKvState lastState;
/** For caching the last accessed partitioned state. */
private String lastName;
+ /** Updated initially on restore and later upon materialization (after
FLINK-21356). */
+ private final List<KeyedStateHandle> materialized = new ArrayList<>();
+
+ /** Updated initially on restore and later cleared upon materialization
(after FLINK-21356). */
+ private final List<StateChangelogHandle> nonMaterialized = new
ArrayList<>();
Review comment:
1. I can understand why "materialized" has to be recorded (because cp
and materialization are separated now), but why non-materialized has to be
stored as well, what it is used for? (you have changelog, materialized to seq
no, already)
Let me guess, is it because each changelog starts from SEQ.first always.
2. Right now, materialized/nonMaterialized reports to JM as well (has a copy
matained in JM), how TM's materialized different from JM's?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]