rkhachatryan commented on code in PR #19907:
URL: https://github.com/apache/flink/pull/19907#discussion_r928323324


##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/ChangelogTaskLocalStateStore.java:
##########
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.state.changelog.ChangelogStateBackendHandle;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.function.LongPredicate;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
+
+/** Changelog's implementation of a {@link TaskLocalStateStore}. */
+public class ChangelogTaskLocalStateStore extends TaskLocalStateStoreImpl {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(ChangelogTaskLocalStateStore.class);
+
+    private static final String CHANGE_LOG_CHECKPOINT_PREFIX = 
"changelog_chk_";
+
+    /**
+     * The mapper of checkpointId and materializationId. (cp3, 
materializationId2) means cp3 refer
+     * to m1.
+     */
+    private final Map<Long, Long> mapToMaterializationId;
+
+    /** Last checkpointId, to check whether checkpoint is out of order. */
+    private long lastCheckpointId = -1L;
+
+    public ChangelogTaskLocalStateStore(
+            @Nonnull JobID jobID,
+            @Nonnull AllocationID allocationID,
+            @Nonnull JobVertexID jobVertexID,
+            @Nonnegative int subtaskIndex,
+            @Nonnull LocalRecoveryConfig localRecoveryConfig,
+            @Nonnull Executor discardExecutor) {
+        super(jobID, allocationID, jobVertexID, subtaskIndex, 
localRecoveryConfig, discardExecutor);
+        this.mapToMaterializationId = new HashMap<>();
+    }
+
+    private void updateReference(long checkpointId, TaskStateSnapshot 
localState) {
+        if (localState == null) {
+            localState = NULL_DUMMY;
+        }
+        for (Map.Entry<OperatorID, OperatorSubtaskState> subtaskStateEntry :
+                localState.getSubtaskStateMappings()) {
+            for (KeyedStateHandle keyedStateHandle :
+                    subtaskStateEntry.getValue().getManagedKeyedState()) {
+                if (keyedStateHandle instanceof ChangelogStateBackendHandle) {
+                    ChangelogStateBackendHandle changelogStateBackendHandle =
+                            (ChangelogStateBackendHandle) keyedStateHandle;
+                    long materializationID = 
changelogStateBackendHandle.getMaterializationID();
+                    if (mapToMaterializationId.containsKey(checkpointId)) {
+                        checkState(
+                                materializationID == 
mapToMaterializationId.get(checkpointId),
+                                "one checkpoint contains at most one 
materializationID");
+                    } else {
+                        mapToMaterializationId.put(checkpointId, 
materializationID);
+                    }
+                }
+            }
+        }
+    }
+
+    @Override
+    public void storeLocalState(long checkpointId, @Nullable TaskStateSnapshot 
localState) {
+        if (checkpointId < lastCheckpointId) {
+            LOG.info(
+                    "Current checkpoint {} is out of order, smaller than last 
CheckpointId {}.",
+                    lastCheckpointId,
+                    checkpointId);
+            return;
+        } else {
+            lastCheckpointId = checkpointId;
+        }
+        synchronized (lock) {
+            updateReference(checkpointId, localState);
+        }
+        super.storeLocalState(checkpointId, localState);
+    }
+
+    @Override
+    protected File getCheckpointDirectory(long checkpointId) {
+        return new File(
+                
getLocalRecoveryDirectoryProvider().subtaskBaseDirectory(checkpointId),
+                CHANGE_LOG_CHECKPOINT_PREFIX + checkpointId);
+    }
+
+    private void deleteMaterialization(LongPredicate pruningChecker) {
+        Set<Long> materializationToRemove;
+        synchronized (lock) {
+            Set<Long> checkpoints =
+                    mapToMaterializationId.keySet().stream()
+                            .filter(pruningChecker::test)
+                            .collect(Collectors.toSet());
+            materializationToRemove =
+                    checkpoints.stream()
+                            .map(mapToMaterializationId::remove)
+                            .collect(Collectors.toSet());
+            materializationToRemove.removeAll(mapToMaterializationId.values());
+        }
+
+        discardExecutor.execute(
+                () ->
+                        syncDiscardDirectoryForCollection(
+                                materializationToRemove.stream()
+                                        .map(
+                                                materializationId ->
+                                                        
super.getCheckpointDirectory(
+                                                                
materializationId))

Review Comment:
   nit: `.map(super::getCheckpointDirectory)`



##########
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java:
##########
@@ -138,12 +140,8 @@
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.FutureUtils;
 
-import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
-import org.apache.flink.shaded.guava30.com.google.common.collect.Sets;
-
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
-
 import java.io.File;

Review Comment:
   Checkstyle failed because of imports of order.
   FYI: 
[build](https://dev.azure.com/khachatryanroman/flink/_build/results?buildId=1590&view=results)
 after fixing imports.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendLocalHandle.java:
##########
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.changelog;
+
+import org.apache.flink.runtime.state.KeyGroupRange;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateHandleID;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * State handle for local copies of {@link ChangelogStateHandleStreamImpl}. 
Consists of a
+ * remoteHandle that maintains the mapping of local handle and remote handle, 
like
+ * sharedStateHandleIDs in {@link 
org.apache.flink.runtime.state.IncrementalLocalKeyedStateHandle}.
+ */
+public class ChangelogStateBackendLocalHandle implements 
ChangelogStateBackendHandle {
+    private static final long serialVersionUID = 1L;
+    private final List<KeyedStateHandle> localMaterialized;
+    private final List<ChangelogStateHandle> localNonMaterialized;
+    private final ChangelogStateBackendHandleImpl remoteHandle;
+
+    public ChangelogStateBackendLocalHandle(
+            List<KeyedStateHandle> localMaterialized,
+            List<ChangelogStateHandle> localNonMaterialized,
+            ChangelogStateBackendHandleImpl remoteHandle) {
+        this.localMaterialized = localMaterialized;
+        this.localNonMaterialized = localNonMaterialized;
+        this.remoteHandle = remoteHandle;
+    }
+
+    @Override
+    public List<KeyedStateHandle> getMaterializedStateHandles() {
+        return localMaterialized;
+    }
+
+    @Override
+    public List<ChangelogStateHandle> getNonMaterializedStateHandles() {
+        return localNonMaterialized;
+    }
+
+    @Override
+    public long getMaterializationID() {
+        return remoteHandle.getMaterializationID();
+    }
+
+    @Override
+    public ChangelogStateBackendHandle rebound(long checkpointId) {
+        throw new UnsupportedOperationException("Should not call here.");
+    }
+
+    public List<KeyedStateHandle> getRemoteMaterializedStateHandles() {
+        return remoteHandle.getMaterializedStateHandles();
+    }
+
+    public List<ChangelogStateHandle> getRemoteNonMaterializedStateHandles() {
+        return remoteHandle.getNonMaterializedStateHandles();
+    }
+
+    @Override
+    public long getCheckpointId() {
+        return remoteHandle.getCheckpointId();
+    }
+
+    @Override
+    public void registerSharedStates(SharedStateRegistry stateRegistry, long 
checkpointID) {
+        remoteHandle.registerSharedStates(stateRegistry, checkpointID);
+    }
+
+    @Override
+    public long getCheckpointedSize() {
+        return remoteHandle.getCheckpointedSize();
+    }
+
+    @Override
+    public KeyGroupRange getKeyGroupRange() {
+        return remoteHandle.getKeyGroupRange();
+    }
+
+    @Nullable
+    @Override
+    public KeyedStateHandle getIntersection(KeyGroupRange keyGroupRange) {
+        throw new UnsupportedOperationException(
+                "This is a local state handle for the TM side only.");
+    }
+
+    @Override
+    public StateHandleID getStateHandleId() {
+        return remoteHandle.getStateHandleId();
+    }
+
+    @Override
+    public void discardState() throws Exception {
+        for (KeyedStateHandle keyedStateHandle : localMaterialized) {
+            keyedStateHandle.discardState();
+        }

Review Comment:
   Won't this discard materialized state potentially used by subsequent 
checkpoints?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStoreImpl.java:
##########
@@ -186,20 +187,27 @@ private void persistLocalStateMetadata(long checkpointId, 
TaskStateSnapshot loca
 
     @VisibleForTesting
     File getTaskStateSnapshotFile(long checkpointId) {
-        final File checkpointDirectory =
-                localRecoveryConfig
-                        .getLocalStateDirectoryProvider()
-                        .orElseThrow(
-                                () -> new IllegalStateException("Local 
recovery must be enabled."))
-                        .subtaskSpecificCheckpointDirectory(checkpointId);
+        return new File(getCheckpointDirectory(checkpointId), 
TASK_STATE_SNAPSHOT_FILENAME);
+    }
 
+    private File createTaskStateSnapshotFile(long checkpointId) {

Review Comment:
   The method doesn't actually create the file (rather the folder).
   
   I think the following set of methods would be more clear:
   - `getTaskStateSnapshotFile(String path, long checkpointId)` // add filename 
suffix
   - `getCheckpointDirectory(checkpointId)` // as is
   - `createFolderOrFail(String path)` // mkdirs
   
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to