This is an automated email from the ASF dual-hosted git repository. roman pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5b58ef66a8835bf22db23e1d9836a1e9f4d94045 Author: Roman Khachatryan <khachatryan.ro...@gmail.com> AuthorDate: Thu Apr 21 17:24:57 2022 +0200 [FLINK-25511][state] Introduce StreamStateHandle ID The ID will be used to track state usage on TM; State objects not shared with JM will be discarded. --- .../flink/runtime/state/KeyGroupsStateHandle.java | 5 +++ .../runtime/state/OperatorStreamStateHandle.java | 5 +++ .../flink/runtime/state/PhysicalStateHandleID.java | 38 ++++++++++++++++++++++ .../state/PlaceholderStreamStateHandle.java | 6 ++++ .../state/RetrievableStreamStateHandle.java | 5 +++ .../flink/runtime/state/StreamStateHandle.java | 3 ++ .../changelog/ChangelogStateBackendHandle.java | 6 ++++ .../runtime/state/filesystem/FileStateHandle.java | 9 +++++ .../state/memory/ByteStreamStateHandle.java | 9 +++++ .../runtime/messages/CheckpointMessagesTest.java | 4 +-- .../flink/runtime/state/ChangelogTestUtils.java | 2 +- .../runtime/state/SharedStateRegistryTest.java | 2 +- .../flink/runtime/state/TestStreamStateHandle.java | 28 ++++++++++++++++ .../state/testutils/EmptyStreamStateHandle.java | 4 +-- .../state/RocksDBStateDownloaderTest.java | 3 +- .../tasks/InterruptSensitiveRestoreTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java | 6 ++++ 17 files changed, 130 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 447cf32a88f..086fcbf8d73 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -110,6 +110,11 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle return stateHandleId; } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + return stateHandle.getStreamStateHandleID(); + } + @Override public KeyGroupRange getKeyGroupRange() { return groupRangeOffsets.getKeyGroupRange(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java index edddd500bc3..9f1af0cb8f7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java @@ -71,6 +71,11 @@ public class OperatorStreamStateHandle implements OperatorStateHandle { return delegateStateHandle.asBytesIfInMemory(); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + return delegateStateHandle.getStreamStateHandleID(); + } + @Override public StreamStateHandle getDelegateStateHandle() { return delegateStateHandle; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java new file mode 100644 index 00000000000..ca8ceead753 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PhysicalStateHandleID.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.util.StringBasedID; + +/** + * Unique ID that allows for physical comparison between state handles. + * + * <p>Different state objects (e.g. different files) representing the same piece of data must have + * different IDs (e.g. file names). This is different from {@link + * org.apache.flink.runtime.state.KeyedStateHandle#getStateHandleId} which returns the same ID. + * + * @see StateHandleID + */ +public class PhysicalStateHandleID extends StringBasedID { + + private static final long serialVersionUID = 1L; + + public PhysicalStateHandleID(String keyString) { + super(keyString); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java index f93034df30a..4b42427de1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/PlaceholderStreamStateHandle.java @@ -51,6 +51,12 @@ public class PlaceholderStreamStateHandle implements StreamStateHandle { "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator."); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + throw new UnsupportedOperationException( + "This is only a placeholder to be replaced by a real StreamStateHandle in the checkpoint coordinator."); + } + @Override public void discardState() throws Exception { // nothing to do. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java index 80f771d4efe..2370a86a70f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java @@ -71,6 +71,11 @@ public class RetrievableStreamStateHandle<T extends Serializable> return wrappedStreamStateHandle.asBytesIfInMemory(); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + return wrappedStreamStateHandle.getStreamStateHandleID(); + } + @Override public void discardState() throws Exception { wrappedStreamStateHandle.discardState(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java index fcee1aecff3..a2290d47e91 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StreamStateHandle.java @@ -37,4 +37,7 @@ public interface StreamStateHandle extends StateObject { /** @return Content of this handle as bytes array if it is already in memory. */ Optional<byte[]> asBytesIfInMemory(); + + /** @return a unique identifier of this handle. */ + PhysicalStateHandleID getStreamStateHandleID(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java index b1f5b0d61ec..96d63007615 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java @@ -22,6 +22,7 @@ import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.runtime.state.CheckpointBoundKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.KeyedStateHandle; +import org.apache.flink.runtime.state.PhysicalStateHandleID; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SharedStateRegistryKey; import org.apache.flink.runtime.state.StateHandleID; @@ -280,6 +281,11 @@ public interface ChangelogStateBackendHandle throw new UnsupportedOperationException("Should not call here."); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + throw new UnsupportedOperationException("Should not call here."); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 8b940478e54..9a706f1756d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.state.filesystem; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.PhysicalStateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import java.io.IOException; @@ -43,6 +44,8 @@ public class FileStateHandle implements StreamStateHandle { /** The size of the state in the file. */ private final long stateSize; + private final PhysicalStateHandleID physicalID; + /** * Creates a new file state for the given file path. * @@ -52,6 +55,7 @@ public class FileStateHandle implements StreamStateHandle { checkArgument(stateSize >= -1); this.filePath = checkNotNull(filePath); this.stateSize = stateSize; + this.physicalID = new PhysicalStateHandleID(filePath.toUri().toString()); } /** @@ -73,6 +77,11 @@ public class FileStateHandle implements StreamStateHandle { return Optional.empty(); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + return physicalID; + } + /** * Discard the state by deleting the file that stores the state. If the parent directory of the * state is empty after deleting the state file, it is also deleted. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index aa257f5b3d1..d64ed7093bb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.PhysicalStateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.util.Preconditions; @@ -40,10 +41,13 @@ public class ByteStreamStateHandle implements StreamStateHandle { */ private final String handleName; + private final PhysicalStateHandleID physicalID; + /** Creates a new ByteStreamStateHandle containing the given data. */ public ByteStreamStateHandle(String handleName, byte[] data) { this.handleName = Preconditions.checkNotNull(handleName); this.data = Preconditions.checkNotNull(data); + this.physicalID = new PhysicalStateHandleID(handleName); } @Override @@ -56,6 +60,11 @@ public class ByteStreamStateHandle implements StreamStateHandle { return Optional.of(getData()); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + return physicalID; + } + public byte[] getData() { return data; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 5620164ddd3..8df5006721c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.KeyGroupRange; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestStreamStateHandle; import org.junit.Test; @@ -101,7 +101,7 @@ public class CheckpointMessagesTest { assertNotNull(copy.toString()); } - private static class MyHandle implements StreamStateHandle { + private static class MyHandle implements TestStreamStateHandle { private static final long serialVersionUID = 8128146204128728332L; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java index 62cd99a83ca..890010e1f6d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/ChangelogTestUtils.java @@ -112,7 +112,7 @@ public class ChangelogTestUtils { } public static class ChangelogStateHandleWrapper extends InMemoryChangelogStateHandle - implements StreamStateHandle { + implements TestStreamStateHandle { private static final long serialVersionUID = 1L; private volatile boolean isDiscarded; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java index 9d8d9e4a20f..a67009f414e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/SharedStateRegistryTest.java @@ -177,7 +177,7 @@ public class SharedStateRegistryTest { assertFalse(nonMaterializedState2.isDiscarded()); } - private static class TestSharedState implements StreamStateHandle { + private static class TestSharedState implements TestStreamStateHandle { private static final long serialVersionUID = 4468635881465159780L; private SharedStateRegistryKey key; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestStreamStateHandle.java new file mode 100644 index 00000000000..7c8780a366e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestStreamStateHandle.java @@ -0,0 +1,28 @@ +package org.apache.flink.runtime.state; +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Test {@link StreamStateHandle} that implements {@link #getStreamStateHandleID()} using {@link + * System#identityHashCode(Object)}. + */ +public interface TestStreamStateHandle extends StreamStateHandle { + + default PhysicalStateHandleID getStreamStateHandleID() { + return new PhysicalStateHandleID(Integer.toString(System.identityHashCode(this))); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java index 0107a02a26c..8b3515a1591 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/EmptyStreamStateHandle.java @@ -19,7 +19,7 @@ package org.apache.flink.runtime.state.testutils; import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestStreamStateHandle; import java.io.IOException; import java.util.Optional; @@ -28,7 +28,7 @@ import java.util.Optional; * A simple dummy implementation of a stream state handle that can be passed in tests. The handle * cannot open an input stream. */ -public class EmptyStreamStateHandle implements StreamStateHandle { +public class EmptyStreamStateHandle implements TestStreamStateHandle { private static final long serialVersionUID = 0L; diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java index ee5bcecc381..fcce8674887 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateDownloaderTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.state.StateHandleID; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestStreamStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.TestLogger; @@ -137,7 +138,7 @@ public class RocksDBStateDownloaderTest extends TestLogger { } } - private static class ThrowingStateHandle implements StreamStateHandle { + private static class ThrowingStateHandle implements TestStreamStateHandle { private static final long serialVersionUID = -2102069659550694805L; private final IOException expectedException; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index 976a07e78c1..d65a1755fdb 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -62,6 +62,7 @@ import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.TestStreamStateHandle; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskexecutor.KvStateService; import org.apache.flink.runtime.taskexecutor.PartitionProducerStateChecker; @@ -299,7 +300,7 @@ public class InterruptSensitiveRestoreTest { // ------------------------------------------------------------------------ @SuppressWarnings("serial") - private static class InterruptLockingStateHandle implements StreamStateHandle { + private static class InterruptLockingStateHandle implements TestStreamStateHandle { private static final long serialVersionUID = 1L; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index f20938061fe..7a95be8375a 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -78,6 +78,7 @@ import org.apache.flink.runtime.state.KeyedStateHandle; import org.apache.flink.runtime.state.OperatorStateBackend; import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.OperatorStreamStateHandle; +import org.apache.flink.runtime.state.PhysicalStateHandleID; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.state.SnapshotResult; import org.apache.flink.runtime.state.StateBackendFactory; @@ -2400,6 +2401,11 @@ public class StreamTaskTest extends TestLogger { throw new IOException("Cannot open input streams in testing implementation."); } + @Override + public PhysicalStateHandleID getStreamStateHandleID() { + throw new RuntimeException("Cannot return ID in testing implementation."); + } + @Override public Optional<byte[]> asBytesIfInMemory() { return Optional.empty();