This is an automated email from the ASF dual-hosted git repository.

roman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5b58ef66a8835bf22db23e1d9836a1e9f4d94045
Author: Roman Khachatryan <khachatryan.ro...@gmail.com>
AuthorDate: Thu Apr 21 17:24:57 2022 +0200

    [FLINK-25511][state] Introduce StreamStateHandle ID
    
    The ID will be used to track state usage on TM;
    State objects not shared with JM will be discarded.
---
 .../flink/runtime/state/KeyGroupsStateHandle.java  |  5 +++
 .../runtime/state/OperatorStreamStateHandle.java   |  5 +++
 .../flink/runtime/state/PhysicalStateHandleID.java | 38 ++++++++++++++++++++++
 .../state/PlaceholderStreamStateHandle.java        |  6 ++++
 .../state/RetrievableStreamStateHandle.java        |  5 +++
 .../flink/runtime/state/StreamStateHandle.java     |  3 ++
 .../changelog/ChangelogStateBackendHandle.java     |  6 ++++
 .../runtime/state/filesystem/FileStateHandle.java  |  9 +++++
 .../state/memory/ByteStreamStateHandle.java        |  9 +++++
 .../runtime/messages/CheckpointMessagesTest.java   |  4 +--
 .../flink/runtime/state/ChangelogTestUtils.java    |  2 +-
 .../runtime/state/SharedStateRegistryTest.java     |  2 +-
 .../flink/runtime/state/TestStreamStateHandle.java | 28 ++++++++++++++++
 .../state/testutils/EmptyStreamStateHandle.java    |  4 +--
 .../state/RocksDBStateDownloaderTest.java          |  3 +-
 .../tasks/InterruptSensitiveRestoreTest.java       |  3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java    |  6 ++++
 17 files changed, 130 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 447cf32a88f..086fcbf8d73 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -110,6 +110,11 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
         return stateHandleId;
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return stateHandle.getStreamStateHandleID();
+    }
+
     @Override
     public KeyGroupRange getKeyGroupRange() {
         return groupRangeOffsets.getKeyGroupRange();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
index edddd500bc3..9f1af0cb8f7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
@@ -71,6 +71,11 @@ public class OperatorStreamStateHandle implements 
OperatorStateHandle {
         return delegateStateHandle.asBytesIfInMemory();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return delegateStateHandle.getStreamStateHandleID();
+    }
+
     @Override
     public StreamStateHandle getDelegateStateHandle() {
         return delegateStateHandle;
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java
new file mode 100644
index 00000000000..ca8ceead753
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.util.StringBasedID;
+
+/**
+ * Unique ID that allows for physical comparison between state handles.
+ *
+ * <p>Different state objects (e.g. different files) representing the same 
piece of data must have
+ * different IDs (e.g. file names). This is different from {@link
+ * org.apache.flink.runtime.state.KeyedStateHandle#getStateHandleId} which 
returns the same ID.
+ *
+ * @see StateHandleID
+ */
+public class PhysicalStateHandleID extends StringBasedID {
+
+    private static final long serialVersionUID = 1L;
+
+    public PhysicalStateHandleID(String keyString) {
+        super(keyString);
+    }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
index f93034df30a..4b42427de1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java
@@ -51,6 +51,12 @@ public class PlaceholderStreamStateHandle implements 
StreamStateHandle {
                 "This is only a placeholder to be replaced by a real 
StreamStateHandle in the checkpoint coordinator.");
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        throw new UnsupportedOperationException(
+                "This is only a placeholder to be replaced by a real 
StreamStateHandle in the checkpoint coordinator.");
+    }
+
     @Override
     public void discardState() throws Exception {
         // nothing to do.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 80f771d4efe..2370a86a70f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -71,6 +71,11 @@ public class RetrievableStreamStateHandle<T extends 
Serializable>
         return wrappedStreamStateHandle.asBytesIfInMemory();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return wrappedStreamStateHandle.getStreamStateHandleID();
+    }
+
     @Override
     public void discardState() throws Exception {
         wrappedStreamStateHandle.discardState();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
index fcee1aecff3..a2290d47e91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java
@@ -37,4 +37,7 @@ public interface StreamStateHandle extends StateObject {
 
     /** @return Content of this handle as bytes array if it is already in 
memory. */
     Optional<byte[]> asBytesIfInMemory();
+
+    /** @return a unique identifier of this handle. */
+    PhysicalStateHandleID getStreamStateHandleID();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index b1f5b0d61ec..96d63007615 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SharedStateRegistryKey;
 import org.apache.flink.runtime.state.StateHandleID;
@@ -280,6 +281,11 @@ public interface ChangelogStateBackendHandle
                 throw new UnsupportedOperationException("Should not call 
here.");
             }
 
+            @Override
+            public PhysicalStateHandleID getStreamStateHandleID() {
+                throw new UnsupportedOperationException("Should not call 
here.");
+            }
+
             @Override
             public boolean equals(Object o) {
                 if (this == o) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 8b940478e54..9a706f1756d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.filesystem;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
@@ -43,6 +44,8 @@ public class FileStateHandle implements StreamStateHandle {
     /** The size of the state in the file. */
     private final long stateSize;
 
+    private final PhysicalStateHandleID physicalID;
+
     /**
      * Creates a new file state for the given file path.
      *
@@ -52,6 +55,7 @@ public class FileStateHandle implements StreamStateHandle {
         checkArgument(stateSize >= -1);
         this.filePath = checkNotNull(filePath);
         this.stateSize = stateSize;
+        this.physicalID = new 
PhysicalStateHandleID(filePath.toUri().toString());
     }
 
     /**
@@ -73,6 +77,11 @@ public class FileStateHandle implements StreamStateHandle {
         return Optional.empty();
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return physicalID;
+    }
+
     /**
      * Discard the state by deleting the file that stores the state. If the 
parent directory of the
      * state is empty after deleting the state file, it is also deleted.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index aa257f5b3d1..d64ed7093bb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.Preconditions;
 
@@ -40,10 +41,13 @@ public class ByteStreamStateHandle implements 
StreamStateHandle {
      */
     private final String handleName;
 
+    private final PhysicalStateHandleID physicalID;
+
     /** Creates a new ByteStreamStateHandle containing the given data. */
     public ByteStreamStateHandle(String handleName, byte[] data) {
         this.handleName = Preconditions.checkNotNull(handleName);
         this.data = Preconditions.checkNotNull(data);
+        this.physicalID = new PhysicalStateHandleID(handleName);
     }
 
     @Override
@@ -56,6 +60,11 @@ public class ByteStreamStateHandle implements 
StreamStateHandle {
         return Optional.of(getData());
     }
 
+    @Override
+    public PhysicalStateHandleID getStreamStateHandleID() {
+        return physicalID;
+    }
+
     public byte[] getData() {
         return data;
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
index 5620164ddd3..8df5006721c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java
@@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.runtime.state.KeyGroupRange;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestStreamStateHandle;
 
 import org.junit.Test;
 
@@ -101,7 +101,7 @@ public class CheckpointMessagesTest {
         assertNotNull(copy.toString());
     }
 
-    private static class MyHandle implements StreamStateHandle {
+    private static class MyHandle implements TestStreamStateHandle {
 
         private static final long serialVersionUID = 8128146204128728332L;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
index 62cd99a83ca..890010e1f6d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java
@@ -112,7 +112,7 @@ public class ChangelogTestUtils {
     }
 
     public static class ChangelogStateHandleWrapper extends 
InMemoryChangelogStateHandle
-            implements StreamStateHandle {
+            implements TestStreamStateHandle {
         private static final long serialVersionUID = 1L;
         private volatile boolean isDiscarded;
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
index 9d8d9e4a20f..a67009f414e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java
@@ -177,7 +177,7 @@ public class SharedStateRegistryTest {
         assertFalse(nonMaterializedState2.isDiscarded());
     }
 
-    private static class TestSharedState implements StreamStateHandle {
+    private static class TestSharedState implements TestStreamStateHandle {
         private static final long serialVersionUID = 4468635881465159780L;
 
         private SharedStateRegistryKey key;
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestStreamStateHandle.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestStreamStateHandle.java
new file mode 100644
index 00000000000..7c8780a366e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestStreamStateHandle.java
@@ -0,0 +1,28 @@
+package org.apache.flink.runtime.state;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Test {@link StreamStateHandle} that implements {@link 
#getStreamStateHandleID()} using {@link
+ * System#identityHashCode(Object)}.
+ */
+public interface TestStreamStateHandle extends StreamStateHandle {
+
+    default PhysicalStateHandleID getStreamStateHandleID() {
+        return new 
PhysicalStateHandleID(Integer.toString(System.identityHashCode(this)));
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java
index 0107a02a26c..8b3515a1591 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java
@@ -19,7 +19,7 @@
 package org.apache.flink.runtime.state.testutils;
 
 import org.apache.flink.core.fs.FSDataInputStream;
-import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestStreamStateHandle;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -28,7 +28,7 @@ import java.util.Optional;
  * A simple dummy implementation of a stream state handle that can be passed 
in tests. The handle
  * cannot open an input stream.
  */
-public class EmptyStreamStateHandle implements StreamStateHandle {
+public class EmptyStreamStateHandle implements TestStreamStateHandle {
 
     private static final long serialVersionUID = 0L;
 
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
index ee5bcecc381..fcce8674887 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupRange;
 import org.apache.flink.runtime.state.StateHandleID;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestStreamStateHandle;
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.TestLogger;
 
@@ -137,7 +138,7 @@ public class RocksDBStateDownloaderTest extends TestLogger {
         }
     }
 
-    private static class ThrowingStateHandle implements StreamStateHandle {
+    private static class ThrowingStateHandle implements TestStreamStateHandle {
         private static final long serialVersionUID = -2102069659550694805L;
 
         private final IOException expectedException;
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index 976a07e78c1..d65a1755fdb 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.TestStreamStateHandle;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskexecutor.KvStateService;
 import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker;
@@ -299,7 +300,7 @@ public class InterruptSensitiveRestoreTest {
     // ------------------------------------------------------------------------
 
     @SuppressWarnings("serial")
-    private static class InterruptLockingStateHandle implements 
StreamStateHandle {
+    private static class InterruptLockingStateHandle implements 
TestStreamStateHandle {
 
         private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index f20938061fe..7a95be8375a 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -78,6 +78,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.OperatorStateBackend;
 import org.apache.flink.runtime.state.OperatorStateHandle;
 import org.apache.flink.runtime.state.OperatorStreamStateHandle;
+import org.apache.flink.runtime.state.PhysicalStateHandleID;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.StateBackendFactory;
@@ -2400,6 +2401,11 @@ public class StreamTaskTest extends TestLogger {
             throw new IOException("Cannot open input streams in testing 
implementation.");
         }
 
+        @Override
+        public PhysicalStateHandleID getStreamStateHandleID() {
+            throw new RuntimeException("Cannot return ID in testing 
implementation.");
+        }
+
         @Override
         public Optional<byte[]> asBytesIfInMemory() {
             return Optional.empty();

Reply via email to