This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8e0e2ca97c9225d4ce3faa6781a0e3a524645cd4 Author: Stefan Richter <srich...@confluent.io> AuthorDate: Tue Jan 16 16:09:44 2024 +0100 [FLINK-34134] Implement handle-specific size/location reporting methods. --- .../KubernetesStateHandleStore.java | 5 ++ .../flink/runtime/checkpoint/OperatorState.java | 18 ++--- .../runtime/checkpoint/OperatorSubtaskState.java | 60 ++++++++-------- .../runtime/checkpoint/StateObjectCollection.java | 35 ++++----- .../apache/flink/runtime/checkpoint/TaskState.java | 18 ++--- .../runtime/checkpoint/TaskStateSnapshot.java | 17 +++-- .../runtime/state/AbstractChannelStateHandle.java | 19 +++-- .../flink/runtime/state/ChainedStateHandle.java | 23 +++--- .../flink/runtime/state/DirectoryStateHandle.java | 25 +++++-- .../state/IncrementalLocalKeyedStateHandle.java | 6 ++ .../state/IncrementalRemoteKeyedStateHandle.java | 25 ++++--- .../flink/runtime/state/KeyGroupsStateHandle.java | 11 +++ .../runtime/state/OperatorStreamStateHandle.java | 5 ++ .../state/RetrievableStreamStateHandle.java | 5 ++ .../flink/runtime/state/SnapshotDirectory.java | 4 +- .../changelog/ChangelogStateBackendHandle.java | 5 ++ .../inmemory/InMemoryChangelogStateHandle.java | 5 ++ .../runtime/state/filesystem/FileStateHandle.java | 14 ++++ .../state/memory/ByteStreamStateHandle.java | 5 ++ .../IncrementalLocalKeyedStateHandleTest.java | 84 ++++++++++++++++++++++ .../IncrementalRemoteKeyedStateHandleTest.java | 19 +++++ .../runtime/state/KeyGroupsStateHandleTest.java | 21 ++++++ .../state/filesystem/FileStateHandleTest.java | 37 ++++++++++ .../state/memory/ByteStreamStateHandleTest.java | 19 +++++ .../streaming/state/StateHandleDownloadSpec.java | 2 +- 25 files changed, 377 insertions(+), 110 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java index 2a99cf6e2f1..790a0441dbf 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java @@ -137,6 +137,11 @@ public class KubernetesStateHandleStore<T extends Serializable> return inner.getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + inner.collectSizeStats(collector); + } + RetrievableStateHandle<T> getInner() { return inner; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java index 2b6c4e06197..d377ca5ec7b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java @@ -34,6 +34,7 @@ import java.util.List; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkState; @@ -204,16 +205,17 @@ public class OperatorState implements CompositeStateHandle { @Override public long getStateSize() { - long result = coordinatorState == null ? 0L : coordinatorState.getStateSize(); + return streamAllSubHandles().mapToLong(StateObject::getStateSize).sum(); + } - for (int i = 0; i < parallelism; i++) { - OperatorSubtaskState operatorSubtaskState = operatorSubtaskStates.get(i); - if (operatorSubtaskState != null) { - result += operatorSubtaskState.getStateSize(); - } - } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + streamAllSubHandles().forEach(handle -> handle.collectSizeStats(collector)); + } - return result; + private Stream<StateObject> streamAllSubHandles() { + return Stream.concat(Stream.of(coordinatorState), operatorSubtaskStates.values().stream()) + .filter(Objects::nonNull); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java index 4fede9683e5..a2f30e9f64a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.runtime.state.AbstractChannelStateHandle; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.InputChannelStateHandle; import org.apache.flink.runtime.state.KeyedStateHandle; @@ -33,8 +34,12 @@ import org.apache.flink.runtime.state.StateUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; +import java.util.Collection; import java.util.List; +import java.util.Objects; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.runtime.state.AbstractChannelStateHandle.collectUniqueDelegates; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -130,21 +135,26 @@ public class OperatorSubtaskState implements CompositeStateHandle { this.inputRescalingDescriptor = checkNotNull(inputRescalingDescriptor); this.outputRescalingDescriptor = checkNotNull(outputRescalingDescriptor); - long calculateStateSize = managedOperatorState.getStateSize(); - calculateStateSize += rawOperatorState.getStateSize(); - calculateStateSize += managedKeyedState.getStateSize(); - calculateStateSize += rawKeyedState.getStateSize(); - calculateStateSize += inputChannelState.getStateSize(); - calculateStateSize += resultSubpartitionState.getStateSize(); - stateSize = calculateStateSize; - - long calculateCheckpointedSize = managedOperatorState.getCheckpointedSize(); - calculateCheckpointedSize += rawOperatorState.getCheckpointedSize(); - calculateCheckpointedSize += managedKeyedState.getCheckpointedSize(); - calculateCheckpointedSize += rawKeyedState.getCheckpointedSize(); - calculateCheckpointedSize += inputChannelState.getCheckpointedSize(); - calculateCheckpointedSize += resultSubpartitionState.getCheckpointedSize(); - this.checkpointedSize = calculateCheckpointedSize; + this.stateSize = streamSubCollections().mapToLong(StateObject::getStateSize).sum(); + this.checkpointedSize = + streamSubCollections().mapToLong(StateObjectCollection::getCheckpointedSize).sum(); + } + + private Stream<StateObjectCollection<?>> streamSubCollections() { + return Stream.of(streamOperatorAndKeyedStates(), streamChannelStates()) + .flatMap(Function.identity()); + } + + private Stream<StateObjectCollection<? extends StateObject>> streamOperatorAndKeyedStates() { + return Stream.of(managedOperatorState, rawOperatorState, managedKeyedState, rawKeyedState) + .filter(Objects::nonNull); + } + + private Stream<StateObjectCollection<? extends AbstractChannelStateHandle<?>>> + streamChannelStates() { + return Stream.<StateObjectCollection<? extends AbstractChannelStateHandle<?>>>of( + inputChannelState, resultSubpartitionState) + .filter(Objects::nonNull); } @VisibleForTesting @@ -195,20 +205,10 @@ public class OperatorSubtaskState implements CompositeStateHandle { } public List<StateObject> getDiscardables() { - List<StateObject> toDispose = - new ArrayList<>( - managedOperatorState.size() - + rawOperatorState.size() - + managedKeyedState.size() - + rawKeyedState.size() - + inputChannelState.size() - + resultSubpartitionState.size()); - toDispose.addAll(managedOperatorState); - toDispose.addAll(rawOperatorState); - toDispose.addAll(managedKeyedState); - toDispose.addAll(rawKeyedState); - toDispose.addAll(collectUniqueDelegates(inputChannelState, resultSubpartitionState)); - return toDispose; + return Stream.concat( + streamOperatorAndKeyedStates().flatMap(Collection::stream), + collectUniqueDelegates(streamChannelStates())) + .collect(Collectors.toList()); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java index 5bc97c68c8f..da47b8e3438 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java @@ -31,7 +31,9 @@ import java.util.Collection; import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.Objects; import java.util.function.Predicate; +import java.util.stream.Stream; /** * This class represents a generic collection for {@link StateObject}s. Being a state object itself, @@ -142,11 +144,22 @@ public class StateObjectCollection<T extends StateObject> implements Collection< @Override public long getStateSize() { - return sumAllSizes(stateObjects); + return streamAllStateObjects().mapToLong(StateObject::getStateSize).sum(); + } + + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + streamAllStateObjects().forEach(object -> object.collectSizeStats(collector)); } public long getCheckpointedSize() { - return sumAllCheckpointedSizes(stateObjects); + return streamAllStateObjects() + .mapToLong(StateObjectCollection::getCheckpointedSizeNullSafe) + .sum(); + } + + private Stream<T> streamAllStateObjects() { + return stateObjects.stream().filter(Objects::nonNull); } /** Returns true if this contains at least one {@link StateObject}. */ @@ -214,28 +227,10 @@ public class StateObjectCollection<T extends StateObject> implements Collection< return stateObject == null ? empty() : singleton(stateObject); } - private static long sumAllSizes(Collection<? extends StateObject> stateObject) { - long size = 0L; - for (StateObject object : stateObject) { - size += getSizeNullSafe(object); - } - - return size; - } - private static long getSizeNullSafe(StateObject stateObject) { return stateObject != null ? stateObject.getStateSize() : 0L; } - private static long sumAllCheckpointedSizes(Collection<? extends StateObject> stateObject) { - long size = 0L; - for (StateObject object : stateObject) { - size += getCheckpointedSizeNullSafe(object); - } - - return size; - } - private static long getCheckpointedSizeNullSafe(StateObject stateObject) { return stateObject instanceof CompositeStateHandle ? ((CompositeStateHandle) stateObject).getCheckpointedSize() diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java index 122a4c14160..2899a580755 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.Preconditions; @@ -28,6 +29,7 @@ import java.util.Collection; import java.util.Collections; import java.util.Map; import java.util.Objects; +import java.util.stream.Stream; /** * Simple container class which contains the task state and key-group state handles for the sub @@ -150,16 +152,16 @@ public class TaskState implements CompositeStateHandle { @Override public long getStateSize() { - long result = 0L; + return streamSubtaskState().mapToLong(StateObject::getStateSize).sum(); + } - for (int i = 0; i < parallelism; i++) { - SubtaskState subtaskState = subtaskStates.get(i); - if (subtaskState != null) { - result += subtaskState.getStateSize(); - } - } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + streamSubtaskState().forEach(state -> state.collectSizeStats(collector)); + } - return result; + private Stream<SubtaskState> streamSubtaskState() { + return subtaskStates.values().stream().filter(Objects::nonNull); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java index b393dae42da..35d45ae886a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.state.CompositeStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.runtime.state.StateUtil; import org.apache.flink.util.CollectionUtil; import org.apache.flink.util.FlinkRuntimeException; @@ -38,6 +39,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; import java.util.function.Function; +import java.util.stream.Stream; import static org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.NO_RESCALE; @@ -159,15 +161,16 @@ public class TaskStateSnapshot implements CompositeStateHandle { @Override public long getStateSize() { - long size = 0L; + return streamOperatorSubtaskStates().mapToLong(StateObject::getStateSize).sum(); + } - for (OperatorSubtaskState subtaskState : subtaskStatesByOperatorID.values()) { - if (subtaskState != null) { - size += subtaskState.getStateSize(); - } - } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + streamOperatorSubtaskStates().forEach(oss -> oss.collectSizeStats(collector)); + } - return size; + private Stream<OperatorSubtaskState> streamOperatorSubtaskStates() { + return subtaskStatesByOperatorID.values().stream().filter(Objects::nonNull); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java index ea811540d9d..250d77c9f6d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java @@ -18,14 +18,14 @@ package org.apache.flink.runtime.state; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.StateObjectCollection; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.List; import java.util.Objects; -import java.util.Set; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -66,15 +66,12 @@ public abstract class AbstractChannelStateHandle<Info> implements StateObject { this.size = size; } - public static Set<StreamStateHandle> collectUniqueDelegates( - Collection<? extends AbstractChannelStateHandle<?>>... collections) { - Set<StreamStateHandle> result = new HashSet<>(); - for (Collection<? extends AbstractChannelStateHandle<?>> collection : collections) { - for (AbstractChannelStateHandle<?> handle : collection) { - result.add(handle.getDelegate()); - } - } - return result; + public static Stream<StreamStateHandle> collectUniqueDelegates( + Stream<StateObjectCollection<? extends AbstractChannelStateHandle<?>>> collections) { + return collections + .flatMap(Collection::stream) + .map(AbstractChannelStateHandle::getDelegate) + .distinct(); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java index 6625def309b..562f2e38d30 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java @@ -22,6 +22,8 @@ import org.apache.flink.util.Preconditions; import java.util.Collections; import java.util.List; +import java.util.Objects; +import java.util.stream.Stream; /** Handle to state handles for the operators in an operator chain. */ public class ChainedStateHandle<T extends StateObject> implements StateObject { @@ -85,18 +87,19 @@ public class ChainedStateHandle<T extends StateObject> implements StateObject { @Override public long getStateSize() { - long sumStateSize = 0; + return streamInternalHandles().mapToLong(StateObject::getStateSize).sum(); + } - if (operatorStateHandles != null) { - for (T state : operatorStateHandles) { - if (state != null) { - sumStateSize += state.getStateSize(); - } - } - } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + streamInternalHandles().forEach(handle -> handle.collectSizeStats(collector)); + } - // State size as sum of all state sizes - return sumStateSize; + private Stream<? extends T> streamInternalHandles() { + if (operatorStateHandles.isEmpty()) { + return Stream.empty(); + } + return operatorStateHandles.stream().filter(Objects::nonNull); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java index 60049fde088..4e9ff682184 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java @@ -38,12 +38,26 @@ public class DirectoryStateHandle implements StateObject { /** The path that describes the directory, as a string, to be serializable. */ private final String directoryString; + /** (Optional) Size of the directory, used for metrics. Can be 0 if unknown or empty. */ + private final long directorySize; + /** Transient path cache, to avoid re-parsing the string. */ private transient Path directory; - public DirectoryStateHandle(@Nonnull Path directory) { + public DirectoryStateHandle(@Nonnull Path directory, long directorySize) { this.directory = directory; this.directoryString = directory.toString(); + this.directorySize = directorySize; + } + + public static DirectoryStateHandle forPathWithSize(@Nonnull Path directory) { + long size; + try { + size = FileUtils.getDirectoryFilesSize(directory); + } catch (IOException e) { + size = 0L; + } + return new DirectoryStateHandle(directory, size); } @Override @@ -54,9 +68,12 @@ public class DirectoryStateHandle implements StateObject { @Override public long getStateSize() { - // For now, we will not report any size, but in the future this could (if needed) return the - // total dir size. - return 0L; // unknown + return directorySize; + } + + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + collector.add(StateObjectLocation.LOCAL_DISK, directorySize); } @Nonnull diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java index ac457f00622..c683724ce1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java @@ -94,6 +94,12 @@ public class IncrementalLocalKeyedStateHandle extends AbstractIncrementalStateHa return directoryStateHandle.getStateSize() + metaStateHandle.getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + metaStateHandle.collectSizeStats(collector); + directoryStateHandle.collectSizeStats(collector); + } + @Override public String toString() { return "IncrementalLocalKeyedStateHandle{" diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java index 41b9a0466cc..2f5335b8853 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java @@ -27,8 +27,11 @@ import org.slf4j.LoggerFactory; import javax.annotation.Nonnull; import java.util.List; +import java.util.Objects; import java.util.UUID; +import java.util.function.Function; import java.util.stream.Collectors; +import java.util.stream.Stream; /** * The handle to states of an incremental snapshot. @@ -235,17 +238,21 @@ public class IncrementalRemoteKeyedStateHandle extends AbstractIncrementalStateH @Override public long getStateSize() { - long size = StateUtil.getStateSize(metaStateHandle); - - for (HandleAndLocalPath handleAndLocalPath : sharedState) { - size += handleAndLocalPath.getStateSize(); - } + return streamSubHandles().mapToLong(StateObject::getStateSize).sum(); + } - for (HandleAndLocalPath handleAndLocalPath : privateState) { - size += handleAndLocalPath.getStateSize(); - } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + streamSubHandles().forEach(handle -> handle.collectSizeStats(collector)); + } - return size; + private Stream<StreamStateHandle> streamSubHandles() { + return Stream.of( + Stream.of(metaStateHandle), + sharedState.stream().map(HandleAndLocalPath::getHandle), + privateState.stream().map(HandleAndLocalPath::getHandle)) + .flatMap(Function.identity()) + .filter(Objects::nonNull); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java index 086fcbf8d73..94eedcf9642 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java @@ -135,6 +135,17 @@ public class KeyGroupsStateHandle implements StreamStateHandle, KeyedStateHandle return stateHandle.getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + // TODO: for now this ignores that only some key groups might be accessed when reading the + // state, so this only reports the upper bound. We could introduce + // #collectSizeStats(StateObjectSizeStatsCollector, KeyGroupRange) in KeyedStateHandle + // that computes which groups where actually touched and computes the size, depending on + // the exact state handle type, from either the offsets (e.g. here) or for the full size + // (e.g. remote incremental) when we restore from managed/raw keyed state. + stateHandle.collectSizeStats(collector); + } + @Override public long getCheckpointedSize() { return getStateSize(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java index 9f1af0cb8f7..90c1f20b28b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java @@ -61,6 +61,11 @@ public class OperatorStreamStateHandle implements OperatorStateHandle { return delegateStateHandle.getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + delegateStateHandle.collectSizeStats(collector); + } + @Override public FSDataInputStream openInputStream() throws IOException { return delegateStateHandle.openInputStream(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java index 2370a86a70f..e807b93dd63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java @@ -86,6 +86,11 @@ public class RetrievableStreamStateHandle<T extends Serializable> return wrappedStreamStateHandle.getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + wrappedStreamStateHandle.collectSizeStats(collector); + } + @Override public void close() throws IOException { // wrappedStreamStateHandle.close(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java index 06b0be03c5a..bef4a153cf5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java @@ -172,7 +172,7 @@ public abstract class SnapshotDirectory { private static class PermanentSnapshotDirectory extends SnapshotDirectory { - PermanentSnapshotDirectory(@Nonnull Path directory) throws IOException { + PermanentSnapshotDirectory(@Nonnull Path directory) { super(directory); } @@ -180,7 +180,7 @@ public abstract class SnapshotDirectory { public DirectoryStateHandle completeSnapshotAndGetHandle() throws IOException { if (State.COMPLETED == state.get() || state.compareAndSet(State.ONGOING, State.COMPLETED)) { - return new DirectoryStateHandle(directory); + return DirectoryStateHandle.forPathWithSize(directory); } else { throw new IOException( "Expected state " + State.ONGOING + " but found state " + state.get()); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java index 6c5c7feae06..ab35aedc756 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java @@ -365,6 +365,11 @@ public interface ChangelogStateBackendHandle return keyedStateHandle.getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + keyedStateHandle.collectSizeStats(collector); + } + @Override public FSDataInputStream openInputStream() throws IOException { throw new UnsupportedOperationException("Should not call here."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java index bf72b073654..60ada672f2b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java @@ -88,6 +88,11 @@ public class InMemoryChangelogStateHandle implements ChangelogStateHandle { return getStateSize(); } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize()); + } + public List<StateChange> getChanges() { return Collections.unmodifiableList(changes); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java index 35ba2865eea..4d3fe238fcc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.StreamStateHandle; import java.io.IOException; import java.util.Optional; +import java.util.regex.Pattern; import static org.apache.flink.util.Preconditions.checkArgument; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -38,6 +39,8 @@ public class FileStateHandle implements StreamStateHandle { private static final long serialVersionUID = 350284443258002355L; + private static final Pattern NOT_LOCAL_FILER = Pattern.compile("(?!file\\b)\\w+?://.*"); + /** The path to the file in the filesystem, fully describing the file system. */ private final Path filePath; @@ -119,6 +122,17 @@ public class FileStateHandle implements StreamStateHandle { return stateSize; } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + final StateObjectLocation location; + if (NOT_LOCAL_FILER.matcher(filePath.toUri().toString()).matches()) { + location = StateObjectLocation.REMOTE; + } else { + location = StateObjectLocation.LOCAL_DISK; + } + collector.add(location, getStateSize()); + } + /** * Gets the file system that stores the file state. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java index 83f4fe54d25..9e32bc623ff 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java @@ -78,6 +78,11 @@ public class ByteStreamStateHandle implements StreamStateHandle { return data.length; } + @Override + public void collectSizeStats(StateObjectSizeStatsCollector collector) { + collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize()); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandleTest.java new file mode 100644 index 00000000000..ec4ba8f7e62 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandleTest.java @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.file.Path; +import java.util.Collections; +import java.util.EnumMap; +import java.util.UUID; + +/** Unit tests for {@link IncrementalLocalKeyedStateHandle}. */ +public class IncrementalLocalKeyedStateHandleTest { + + @Test + public void testDirectorySize(@TempDir Path directory) throws IOException { + final int metaDataBytes = 42; + ByteStreamStateHandle metaDataStateHandle = + new ByteStreamStateHandle("MetaDataTest", new byte[metaDataBytes]); + + final int fileOneBytes = 1024; + File outputFile = new File(directory.toFile(), "out.001"); + try (FileOutputStream outputStream = new FileOutputStream(outputFile)) { + outputStream.write(new byte[fileOneBytes]); + } + + File subPath = new File(directory.toFile(), "subdir"); + Preconditions.checkState(subPath.mkdirs()); + final int fileTwoBytes = 128; + outputFile = new File(subPath, "out.002"); + try (FileOutputStream outputStream = new FileOutputStream(outputFile)) { + outputStream.write(new byte[fileTwoBytes]); + } + + DirectoryStateHandle directoryStateHandle = DirectoryStateHandle.forPathWithSize(directory); + IncrementalLocalKeyedStateHandle handle = + new IncrementalLocalKeyedStateHandle( + UUID.randomUUID(), + 0, + directoryStateHandle, + new KeyGroupRange(0, 1), + metaDataStateHandle, + Collections.emptyList()); + + StateObject.StateObjectSizeStatsCollector stats = + StateObject.StateObjectSizeStatsCollector.create(); + handle.collectSizeStats(stats); + Assertions.assertEquals( + metaDataBytes + fileOneBytes + fileTwoBytes, extractLocalStateSizes(stats)); + } + + private long extractLocalStateSizes(StateObject.StateObjectSizeStatsCollector stats) { + EnumMap<StateObject.StateObjectLocation, Long> statsMap = stats.getStats(); + Assertions.assertFalse(statsMap.containsKey(StateObject.StateObjectLocation.REMOTE)); + Assertions.assertFalse(statsMap.containsKey(StateObject.StateObjectLocation.UNKNOWN)); + // Might be in a byte handle or a file. + return statsMap.get(StateObject.StateObjectLocation.LOCAL_DISK) + + statsMap.get(StateObject.StateObjectLocation.LOCAL_MEMORY); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java index 1422bc8bcf0..ad24e1da52f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java @@ -23,10 +23,12 @@ import org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocal import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.util.TernaryBoolean; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.nio.charset.StandardCharsets; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Random; import java.util.UUID; @@ -235,6 +237,23 @@ class IncrementalRemoteKeyedStateHandleTest { assertThat(newHandle.getStateHandleId()).isEqualTo(handle.getStateHandleId()); } + @Test + void testCollectSizeStats() { + IncrementalRemoteKeyedStateHandle handle = create(ThreadLocalRandom.current()); + StateObject.StateObjectSizeStatsCollector statsCollector = + StateObject.StateObjectSizeStatsCollector.create(); + handle.collectSizeStats(statsCollector); + Assertions.assertEquals( + new HashMap<StateObject.StateObjectLocation, Long>() { + { + // Location is LOCAL_MEMORY, even though the handle is called remote because + // we test against a local file system + put(StateObject.StateObjectLocation.LOCAL_MEMORY, handle.getStateSize()); + } + }, + statsCollector.getStats()); + } + @Test void testConcurrentCheckpointSharedStateRegistration() throws Exception { String localPath = "1.sst"; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java index c933e7b4b22..0cbe6097da3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java @@ -20,8 +20,11 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.HashMap; + import static org.assertj.core.api.Assertions.assertThat; /** A test for {@link KeyGroupsStateHandle} */ @@ -52,4 +55,22 @@ class KeyGroupsStateHandleTest { KeyGroupRange newRange = new KeyGroupRange(8, 11); assertThat(handle.getIntersection(newRange)).isNull(); } + + @Test + void testCollectSizeStats() { + final KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 7); + final byte[] data = new byte[5]; + final ByteStreamStateHandle innerHandle = new ByteStreamStateHandle("name", data); + KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, innerHandle); + StateObject.StateObjectSizeStatsCollector statsCollector = + StateObject.StateObjectSizeStatsCollector.create(); + handle.collectSizeStats(statsCollector); + Assertions.assertEquals( + new HashMap<StateObject.StateObjectLocation, Long>() { + { + put(StateObject.StateObjectLocation.LOCAL_MEMORY, (long) data.length); + } + }, + statsCollector.getStats()); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java index 0a879011baa..346874508c9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java @@ -24,10 +24,12 @@ import org.apache.flink.core.fs.FileSystemFactory; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.local.LocalFileSystem; import org.apache.flink.core.plugin.TestingPluginManager; +import org.apache.flink.runtime.state.StateObject; import org.apache.flink.util.function.BiFunctionWithException; import org.apache.flink.util.function.FunctionWithException; import org.apache.flink.util.function.RunnableWithException; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; @@ -152,6 +154,41 @@ class FileStateHandleTest { .build()); } + @Test + void testCollectSizeStats() throws Exception { + final long stateSize = 123L; + + StateObject.StateObjectSizeStatsCollector statsCollector = + StateObject.StateObjectSizeStatsCollector.create(); + FileStateHandle handle = + new FileStateHandle(new Path(new URI("file:///home/test.txt")), stateSize); + handle.collectSizeStats(statsCollector); + checkStats(statsCollector, StateObject.StateObjectLocation.LOCAL_DISK, stateSize); + + statsCollector = StateObject.StateObjectSizeStatsCollector.create(); + handle = new FileStateHandle(new Path(new URI("/home/test.txt")), stateSize); + handle.collectSizeStats(statsCollector); + checkStats(statsCollector, StateObject.StateObjectLocation.LOCAL_DISK, stateSize); + + statsCollector = StateObject.StateObjectSizeStatsCollector.create(); + handle = new FileStateHandle(new Path(new URI("s3:///folder/test.txt")), stateSize); + handle.collectSizeStats(statsCollector); + checkStats(statsCollector, StateObject.StateObjectLocation.REMOTE, stateSize); + } + + private void checkStats( + StateObject.StateObjectSizeStatsCollector statsCollector, + StateObject.StateObjectLocation expectedLocation, + long expectedSize) { + Assertions.assertEquals( + new HashMap<StateObject.StateObjectLocation, Long>() { + { + put(expectedLocation, expectedSize); + } + }, + statsCollector.getStats()); + } + private static void testDiscardStateFailed(FileSystem fileSystem) throws Exception { runInFileSystemContext( fileSystem, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java index c8df2275215..1ea6fc89847 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java @@ -19,10 +19,13 @@ package org.apache.flink.runtime.state.memory; import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.runtime.state.StateObject; +import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import java.io.IOException; +import java.util.HashMap; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -143,4 +146,20 @@ class ByteStreamStateHandleTest { assertThat(in.read()).isEqualTo(-1); // got -1 because of EOF. } } + + @Test + void testCollectSizeStats() { + final byte[] data = new byte[5]; + final ByteStreamStateHandle handle = new ByteStreamStateHandle("name", data); + StateObject.StateObjectSizeStatsCollector statsCollector = + StateObject.StateObjectSizeStatsCollector.create(); + handle.collectSizeStats(statsCollector); + Assertions.assertEquals( + new HashMap<StateObject.StateObjectLocation, Long>() { + { + put(StateObject.StateObjectLocation.LOCAL_MEMORY, (long) data.length); + } + }, + statsCollector.getStats()); + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java index 5f37f84921f..7f890230d74 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java @@ -53,7 +53,7 @@ public class StateHandleDownloadSpec { return new IncrementalLocalKeyedStateHandle( stateHandle.getBackendIdentifier(), stateHandle.getCheckpointId(), - new DirectoryStateHandle(downloadDestination), + new DirectoryStateHandle(downloadDestination, stateHandle.getStateSize()), stateHandle.getKeyGroupRange(), stateHandle.getMetaDataStateHandle(), stateHandle.getSharedState());