This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8e0e2ca97c9225d4ce3faa6781a0e3a524645cd4
Author: Stefan Richter <srich...@confluent.io>
AuthorDate: Tue Jan 16 16:09:44 2024 +0100

    [FLINK-34134] Implement handle-specific size/location reporting methods.
---
 .../KubernetesStateHandleStore.java                |  5 ++
 .../flink/runtime/checkpoint/OperatorState.java    | 18 ++---
 .../runtime/checkpoint/OperatorSubtaskState.java   | 60 ++++++++--------
 .../runtime/checkpoint/StateObjectCollection.java  | 35 ++++-----
 .../apache/flink/runtime/checkpoint/TaskState.java | 18 ++---
 .../runtime/checkpoint/TaskStateSnapshot.java      | 17 +++--
 .../runtime/state/AbstractChannelStateHandle.java  | 19 +++--
 .../flink/runtime/state/ChainedStateHandle.java    | 23 +++---
 .../flink/runtime/state/DirectoryStateHandle.java  | 25 +++++--
 .../state/IncrementalLocalKeyedStateHandle.java    |  6 ++
 .../state/IncrementalRemoteKeyedStateHandle.java   | 25 ++++---
 .../flink/runtime/state/KeyGroupsStateHandle.java  | 11 +++
 .../runtime/state/OperatorStreamStateHandle.java   |  5 ++
 .../state/RetrievableStreamStateHandle.java        |  5 ++
 .../flink/runtime/state/SnapshotDirectory.java     |  4 +-
 .../changelog/ChangelogStateBackendHandle.java     |  5 ++
 .../inmemory/InMemoryChangelogStateHandle.java     |  5 ++
 .../runtime/state/filesystem/FileStateHandle.java  | 14 ++++
 .../state/memory/ByteStreamStateHandle.java        |  5 ++
 .../IncrementalLocalKeyedStateHandleTest.java      | 84 ++++++++++++++++++++++
 .../IncrementalRemoteKeyedStateHandleTest.java     | 19 +++++
 .../runtime/state/KeyGroupsStateHandleTest.java    | 21 ++++++
 .../state/filesystem/FileStateHandleTest.java      | 37 ++++++++++
 .../state/memory/ByteStreamStateHandleTest.java    | 19 +++++
 .../streaming/state/StateHandleDownloadSpec.java   |  2 +-
 25 files changed, 377 insertions(+), 110 deletions(-)

diff --git 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
index 2a99cf6e2f1..790a0441dbf 100644
--- 
a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
+++ 
b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/highavailability/KubernetesStateHandleStore.java
@@ -137,6 +137,11 @@ public class KubernetesStateHandleStore<T extends 
Serializable>
             return inner.getStateSize();
         }
 
+        @Override
+        public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+            inner.collectSizeStats(collector);
+        }
+
         RetrievableStateHandle<T> getInner() {
             return inner;
         }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
index 2b6c4e06197..d377ca5ec7b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorState.java
@@ -34,6 +34,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -204,16 +205,17 @@ public class OperatorState implements 
CompositeStateHandle {
 
     @Override
     public long getStateSize() {
-        long result = coordinatorState == null ? 0L : 
coordinatorState.getStateSize();
+        return 
streamAllSubHandles().mapToLong(StateObject::getStateSize).sum();
+    }
 
-        for (int i = 0; i < parallelism; i++) {
-            OperatorSubtaskState operatorSubtaskState = 
operatorSubtaskStates.get(i);
-            if (operatorSubtaskState != null) {
-                result += operatorSubtaskState.getStateSize();
-            }
-        }
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        streamAllSubHandles().forEach(handle -> 
handle.collectSizeStats(collector));
+    }
 
-        return result;
+    private Stream<StateObject> streamAllSubHandles() {
+        return Stream.concat(Stream.of(coordinatorState), 
operatorSubtaskStates.values().stream())
+                .filter(Objects::nonNull);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
index 4fede9683e5..a2f30e9f64a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/OperatorSubtaskState.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.checkpoint;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.runtime.state.AbstractChannelStateHandle;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.InputChannelStateHandle;
 import org.apache.flink.runtime.state.KeyedStateHandle;
@@ -33,8 +34,12 @@ import org.apache.flink.runtime.state.StateUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.ArrayList;
+import java.util.Collection;
 import java.util.List;
+import java.util.Objects;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.runtime.state.AbstractChannelStateHandle.collectUniqueDelegates;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -130,21 +135,26 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
         this.inputRescalingDescriptor = checkNotNull(inputRescalingDescriptor);
         this.outputRescalingDescriptor = 
checkNotNull(outputRescalingDescriptor);
 
-        long calculateStateSize = managedOperatorState.getStateSize();
-        calculateStateSize += rawOperatorState.getStateSize();
-        calculateStateSize += managedKeyedState.getStateSize();
-        calculateStateSize += rawKeyedState.getStateSize();
-        calculateStateSize += inputChannelState.getStateSize();
-        calculateStateSize += resultSubpartitionState.getStateSize();
-        stateSize = calculateStateSize;
-
-        long calculateCheckpointedSize = 
managedOperatorState.getCheckpointedSize();
-        calculateCheckpointedSize += rawOperatorState.getCheckpointedSize();
-        calculateCheckpointedSize += managedKeyedState.getCheckpointedSize();
-        calculateCheckpointedSize += rawKeyedState.getCheckpointedSize();
-        calculateCheckpointedSize += inputChannelState.getCheckpointedSize();
-        calculateCheckpointedSize += 
resultSubpartitionState.getCheckpointedSize();
-        this.checkpointedSize = calculateCheckpointedSize;
+        this.stateSize = 
streamSubCollections().mapToLong(StateObject::getStateSize).sum();
+        this.checkpointedSize =
+                
streamSubCollections().mapToLong(StateObjectCollection::getCheckpointedSize).sum();
+    }
+
+    private Stream<StateObjectCollection<?>> streamSubCollections() {
+        return Stream.of(streamOperatorAndKeyedStates(), streamChannelStates())
+                .flatMap(Function.identity());
+    }
+
+    private Stream<StateObjectCollection<? extends StateObject>> 
streamOperatorAndKeyedStates() {
+        return Stream.of(managedOperatorState, rawOperatorState, 
managedKeyedState, rawKeyedState)
+                .filter(Objects::nonNull);
+    }
+
+    private Stream<StateObjectCollection<? extends 
AbstractChannelStateHandle<?>>>
+            streamChannelStates() {
+        return Stream.<StateObjectCollection<? extends 
AbstractChannelStateHandle<?>>>of(
+                        inputChannelState, resultSubpartitionState)
+                .filter(Objects::nonNull);
     }
 
     @VisibleForTesting
@@ -195,20 +205,10 @@ public class OperatorSubtaskState implements 
CompositeStateHandle {
     }
 
     public List<StateObject> getDiscardables() {
-        List<StateObject> toDispose =
-                new ArrayList<>(
-                        managedOperatorState.size()
-                                + rawOperatorState.size()
-                                + managedKeyedState.size()
-                                + rawKeyedState.size()
-                                + inputChannelState.size()
-                                + resultSubpartitionState.size());
-        toDispose.addAll(managedOperatorState);
-        toDispose.addAll(rawOperatorState);
-        toDispose.addAll(managedKeyedState);
-        toDispose.addAll(rawKeyedState);
-        toDispose.addAll(collectUniqueDelegates(inputChannelState, 
resultSubpartitionState));
-        return toDispose;
+        return Stream.concat(
+                        
streamOperatorAndKeyedStates().flatMap(Collection::stream),
+                        collectUniqueDelegates(streamChannelStates()))
+                .collect(Collectors.toList());
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
index 5bc97c68c8f..da47b8e3438 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateObjectCollection.java
@@ -31,7 +31,9 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Objects;
 import java.util.function.Predicate;
+import java.util.stream.Stream;
 
 /**
  * This class represents a generic collection for {@link StateObject}s. Being 
a state object itself,
@@ -142,11 +144,22 @@ public class StateObjectCollection<T extends StateObject> 
implements Collection<
 
     @Override
     public long getStateSize() {
-        return sumAllSizes(stateObjects);
+        return 
streamAllStateObjects().mapToLong(StateObject::getStateSize).sum();
+    }
+
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        streamAllStateObjects().forEach(object -> 
object.collectSizeStats(collector));
     }
 
     public long getCheckpointedSize() {
-        return sumAllCheckpointedSizes(stateObjects);
+        return streamAllStateObjects()
+                .mapToLong(StateObjectCollection::getCheckpointedSizeNullSafe)
+                .sum();
+    }
+
+    private Stream<T> streamAllStateObjects() {
+        return stateObjects.stream().filter(Objects::nonNull);
     }
 
     /** Returns true if this contains at least one {@link StateObject}. */
@@ -214,28 +227,10 @@ public class StateObjectCollection<T extends StateObject> 
implements Collection<
         return stateObject == null ? empty() : singleton(stateObject);
     }
 
-    private static long sumAllSizes(Collection<? extends StateObject> 
stateObject) {
-        long size = 0L;
-        for (StateObject object : stateObject) {
-            size += getSizeNullSafe(object);
-        }
-
-        return size;
-    }
-
     private static long getSizeNullSafe(StateObject stateObject) {
         return stateObject != null ? stateObject.getStateSize() : 0L;
     }
 
-    private static long sumAllCheckpointedSizes(Collection<? extends 
StateObject> stateObject) {
-        long size = 0L;
-        for (StateObject object : stateObject) {
-            size += getCheckpointedSizeNullSafe(object);
-        }
-
-        return size;
-    }
-
     private static long getCheckpointedSizeNullSafe(StateObject stateObject) {
         return stateObject instanceof CompositeStateHandle
                 ? ((CompositeStateHandle) stateObject).getCheckpointedSize()
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
index 122a4c14160..2899a580755 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskState.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.Preconditions;
 
@@ -28,6 +29,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Map;
 import java.util.Objects;
+import java.util.stream.Stream;
 
 /**
  * Simple container class which contains the task state and key-group state 
handles for the sub
@@ -150,16 +152,16 @@ public class TaskState implements CompositeStateHandle {
 
     @Override
     public long getStateSize() {
-        long result = 0L;
+        return streamSubtaskState().mapToLong(StateObject::getStateSize).sum();
+    }
 
-        for (int i = 0; i < parallelism; i++) {
-            SubtaskState subtaskState = subtaskStates.get(i);
-            if (subtaskState != null) {
-                result += subtaskState.getStateSize();
-            }
-        }
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        streamSubtaskState().forEach(state -> 
state.collectSizeStats(collector));
+    }
 
-        return result;
+    private Stream<SubtaskState> streamSubtaskState() {
+        return subtaskStates.values().stream().filter(Objects::nonNull);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
index b393dae42da..35d45ae886a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.state.CompositeStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.runtime.state.StateUtil;
 import org.apache.flink.util.CollectionUtil;
 import org.apache.flink.util.FlinkRuntimeException;
@@ -38,6 +39,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
 import java.util.function.Function;
+import java.util.stream.Stream;
 
 import static 
org.apache.flink.runtime.checkpoint.InflightDataRescalingDescriptor.NO_RESCALE;
 
@@ -159,15 +161,16 @@ public class TaskStateSnapshot implements 
CompositeStateHandle {
 
     @Override
     public long getStateSize() {
-        long size = 0L;
+        return 
streamOperatorSubtaskStates().mapToLong(StateObject::getStateSize).sum();
+    }
 
-        for (OperatorSubtaskState subtaskState : 
subtaskStatesByOperatorID.values()) {
-            if (subtaskState != null) {
-                size += subtaskState.getStateSize();
-            }
-        }
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        streamOperatorSubtaskStates().forEach(oss -> 
oss.collectSizeStats(collector));
+    }
 
-        return size;
+    private Stream<OperatorSubtaskState> streamOperatorSubtaskStates() {
+        return 
subtaskStatesByOperatorID.values().stream().filter(Objects::nonNull);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
index ea811540d9d..250d77c9f6d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractChannelStateHandle.java
@@ -18,14 +18,14 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
+import java.util.stream.Stream;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -66,15 +66,12 @@ public abstract class AbstractChannelStateHandle<Info> 
implements StateObject {
         this.size = size;
     }
 
-    public static Set<StreamStateHandle> collectUniqueDelegates(
-            Collection<? extends AbstractChannelStateHandle<?>>... 
collections) {
-        Set<StreamStateHandle> result = new HashSet<>();
-        for (Collection<? extends AbstractChannelStateHandle<?>> collection : 
collections) {
-            for (AbstractChannelStateHandle<?> handle : collection) {
-                result.add(handle.getDelegate());
-            }
-        }
-        return result;
+    public static Stream<StreamStateHandle> collectUniqueDelegates(
+            Stream<StateObjectCollection<? extends 
AbstractChannelStateHandle<?>>> collections) {
+        return collections
+                .flatMap(Collection::stream)
+                .map(AbstractChannelStateHandle::getDelegate)
+                .distinct();
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
index 6625def309b..562f2e38d30 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/ChainedStateHandle.java
@@ -22,6 +22,8 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Objects;
+import java.util.stream.Stream;
 
 /** Handle to state handles for the operators in an operator chain. */
 public class ChainedStateHandle<T extends StateObject> implements StateObject {
@@ -85,18 +87,19 @@ public class ChainedStateHandle<T extends StateObject> 
implements StateObject {
 
     @Override
     public long getStateSize() {
-        long sumStateSize = 0;
+        return 
streamInternalHandles().mapToLong(StateObject::getStateSize).sum();
+    }
 
-        if (operatorStateHandles != null) {
-            for (T state : operatorStateHandles) {
-                if (state != null) {
-                    sumStateSize += state.getStateSize();
-                }
-            }
-        }
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        streamInternalHandles().forEach(handle -> 
handle.collectSizeStats(collector));
+    }
 
-        // State size as sum of all state sizes
-        return sumStateSize;
+    private Stream<? extends T> streamInternalHandles() {
+        if (operatorStateHandles.isEmpty()) {
+            return Stream.empty();
+        }
+        return operatorStateHandles.stream().filter(Objects::nonNull);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
index 60049fde088..4e9ff682184 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/DirectoryStateHandle.java
@@ -38,12 +38,26 @@ public class DirectoryStateHandle implements StateObject {
     /** The path that describes the directory, as a string, to be 
serializable. */
     private final String directoryString;
 
+    /** (Optional) Size of the directory, used for metrics. Can be 0 if 
unknown or empty. */
+    private final long directorySize;
+
     /** Transient path cache, to avoid re-parsing the string. */
     private transient Path directory;
 
-    public DirectoryStateHandle(@Nonnull Path directory) {
+    public DirectoryStateHandle(@Nonnull Path directory, long directorySize) {
         this.directory = directory;
         this.directoryString = directory.toString();
+        this.directorySize = directorySize;
+    }
+
+    public static DirectoryStateHandle forPathWithSize(@Nonnull Path 
directory) {
+        long size;
+        try {
+            size = FileUtils.getDirectoryFilesSize(directory);
+        } catch (IOException e) {
+            size = 0L;
+        }
+        return new DirectoryStateHandle(directory, size);
     }
 
     @Override
@@ -54,9 +68,12 @@ public class DirectoryStateHandle implements StateObject {
 
     @Override
     public long getStateSize() {
-        // For now, we will not report any size, but in the future this could 
(if needed) return the
-        // total dir size.
-        return 0L; // unknown
+        return directorySize;
+    }
+
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        collector.add(StateObjectLocation.LOCAL_DISK, directorySize);
     }
 
     @Nonnull
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
index ac457f00622..c683724ce1c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandle.java
@@ -94,6 +94,12 @@ public class IncrementalLocalKeyedStateHandle extends 
AbstractIncrementalStateHa
         return directoryStateHandle.getStateSize() + 
metaStateHandle.getStateSize();
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        metaStateHandle.collectSizeStats(collector);
+        directoryStateHandle.collectSizeStats(collector);
+    }
+
     @Override
     public String toString() {
         return "IncrementalLocalKeyedStateHandle{"
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
index 41b9a0466cc..2f5335b8853 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandle.java
@@ -27,8 +27,11 @@ import org.slf4j.LoggerFactory;
 import javax.annotation.Nonnull;
 
 import java.util.List;
+import java.util.Objects;
 import java.util.UUID;
+import java.util.function.Function;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 /**
  * The handle to states of an incremental snapshot.
@@ -235,17 +238,21 @@ public class IncrementalRemoteKeyedStateHandle extends 
AbstractIncrementalStateH
 
     @Override
     public long getStateSize() {
-        long size = StateUtil.getStateSize(metaStateHandle);
-
-        for (HandleAndLocalPath handleAndLocalPath : sharedState) {
-            size += handleAndLocalPath.getStateSize();
-        }
+        return streamSubHandles().mapToLong(StateObject::getStateSize).sum();
+    }
 
-        for (HandleAndLocalPath handleAndLocalPath : privateState) {
-            size += handleAndLocalPath.getStateSize();
-        }
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        streamSubHandles().forEach(handle -> 
handle.collectSizeStats(collector));
+    }
 
-        return size;
+    private Stream<StreamStateHandle> streamSubHandles() {
+        return Stream.of(
+                        Stream.of(metaStateHandle),
+                        
sharedState.stream().map(HandleAndLocalPath::getHandle),
+                        
privateState.stream().map(HandleAndLocalPath::getHandle))
+                .flatMap(Function.identity())
+                .filter(Objects::nonNull);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
index 086fcbf8d73..94eedcf9642 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/KeyGroupsStateHandle.java
@@ -135,6 +135,17 @@ public class KeyGroupsStateHandle implements 
StreamStateHandle, KeyedStateHandle
         return stateHandle.getStateSize();
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        // TODO: for now this ignores that only some key groups might be 
accessed when reading the
+        //  state, so this only reports the upper bound. We could introduce
+        //  #collectSizeStats(StateObjectSizeStatsCollector, KeyGroupRange) in 
KeyedStateHandle
+        //  that computes which groups where actually touched and computes the 
size, depending on
+        //  the exact state handle type, from either the offsets (e.g. here) 
or for the full size
+        //  (e.g. remote incremental) when we restore from managed/raw keyed 
state.
+        stateHandle.collectSizeStats(collector);
+    }
+
     @Override
     public long getCheckpointedSize() {
         return getStateSize();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
index 9f1af0cb8f7..90c1f20b28b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/OperatorStreamStateHandle.java
@@ -61,6 +61,11 @@ public class OperatorStreamStateHandle implements 
OperatorStateHandle {
         return delegateStateHandle.getStateSize();
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        delegateStateHandle.collectSizeStats(collector);
+    }
+
     @Override
     public FSDataInputStream openInputStream() throws IOException {
         return delegateStateHandle.openInputStream();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
index 2370a86a70f..e807b93dd63 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/RetrievableStreamStateHandle.java
@@ -86,6 +86,11 @@ public class RetrievableStreamStateHandle<T extends 
Serializable>
         return wrappedStreamStateHandle.getStateSize();
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        wrappedStreamStateHandle.collectSizeStats(collector);
+    }
+
     @Override
     public void close() throws IOException {
         //             wrappedStreamStateHandle.close();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
index 06b0be03c5a..bef4a153cf5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotDirectory.java
@@ -172,7 +172,7 @@ public abstract class SnapshotDirectory {
 
     private static class PermanentSnapshotDirectory extends SnapshotDirectory {
 
-        PermanentSnapshotDirectory(@Nonnull Path directory) throws IOException 
{
+        PermanentSnapshotDirectory(@Nonnull Path directory) {
             super(directory);
         }
 
@@ -180,7 +180,7 @@ public abstract class SnapshotDirectory {
         public DirectoryStateHandle completeSnapshotAndGetHandle() throws 
IOException {
             if (State.COMPLETED == state.get()
                     || state.compareAndSet(State.ONGOING, State.COMPLETED)) {
-                return new DirectoryStateHandle(directory);
+                return DirectoryStateHandle.forPathWithSize(directory);
             } else {
                 throw new IOException(
                         "Expected state " + State.ONGOING + " but found state 
" + state.get());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
index 6c5c7feae06..ab35aedc756 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/ChangelogStateBackendHandle.java
@@ -365,6 +365,11 @@ public interface ChangelogStateBackendHandle
                 return keyedStateHandle.getStateSize();
             }
 
+            @Override
+            public void collectSizeStats(StateObjectSizeStatsCollector 
collector) {
+                keyedStateHandle.collectSizeStats(collector);
+            }
+
             @Override
             public FSDataInputStream openInputStream() throws IOException {
                 throw new UnsupportedOperationException("Should not call 
here.");
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
index bf72b073654..60ada672f2b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/changelog/inmemory/InMemoryChangelogStateHandle.java
@@ -88,6 +88,11 @@ public class InMemoryChangelogStateHandle implements 
ChangelogStateHandle {
         return getStateSize();
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize());
+    }
+
     public List<StateChange> getChanges() {
         return Collections.unmodifiableList(changes);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index 35ba2865eea..4d3fe238fcc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -26,6 +26,7 @@ import org.apache.flink.runtime.state.StreamStateHandle;
 
 import java.io.IOException;
 import java.util.Optional;
+import java.util.regex.Pattern;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -38,6 +39,8 @@ public class FileStateHandle implements StreamStateHandle {
 
     private static final long serialVersionUID = 350284443258002355L;
 
+    private static final Pattern NOT_LOCAL_FILER = 
Pattern.compile("(?!file\\b)\\w+?://.*");
+
     /** The path to the file in the filesystem, fully describing the file 
system. */
     private final Path filePath;
 
@@ -119,6 +122,17 @@ public class FileStateHandle implements StreamStateHandle {
         return stateSize;
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        final StateObjectLocation location;
+        if (NOT_LOCAL_FILER.matcher(filePath.toUri().toString()).matches()) {
+            location = StateObjectLocation.REMOTE;
+        } else {
+            location = StateObjectLocation.LOCAL_DISK;
+        }
+        collector.add(location, getStateSize());
+    }
+
     /**
      * Gets the file system that stores the file state.
      *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
index 83f4fe54d25..9e32bc623ff 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandle.java
@@ -78,6 +78,11 @@ public class ByteStreamStateHandle implements 
StreamStateHandle {
         return data.length;
     }
 
+    @Override
+    public void collectSizeStats(StateObjectSizeStatsCollector collector) {
+        collector.add(StateObjectLocation.LOCAL_MEMORY, getStateSize());
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandleTest.java
new file mode 100644
index 00000000000..ec4ba8f7e62
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalLocalKeyedStateHandleTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
+import org.apache.flink.util.Preconditions;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.EnumMap;
+import java.util.UUID;
+
+/** Unit tests for {@link IncrementalLocalKeyedStateHandle}. */
+public class IncrementalLocalKeyedStateHandleTest {
+
+    @Test
+    public void testDirectorySize(@TempDir Path directory) throws IOException {
+        final int metaDataBytes = 42;
+        ByteStreamStateHandle metaDataStateHandle =
+                new ByteStreamStateHandle("MetaDataTest", new 
byte[metaDataBytes]);
+
+        final int fileOneBytes = 1024;
+        File outputFile = new File(directory.toFile(), "out.001");
+        try (FileOutputStream outputStream = new FileOutputStream(outputFile)) 
{
+            outputStream.write(new byte[fileOneBytes]);
+        }
+
+        File subPath = new File(directory.toFile(), "subdir");
+        Preconditions.checkState(subPath.mkdirs());
+        final int fileTwoBytes = 128;
+        outputFile = new File(subPath, "out.002");
+        try (FileOutputStream outputStream = new FileOutputStream(outputFile)) 
{
+            outputStream.write(new byte[fileTwoBytes]);
+        }
+
+        DirectoryStateHandle directoryStateHandle = 
DirectoryStateHandle.forPathWithSize(directory);
+        IncrementalLocalKeyedStateHandle handle =
+                new IncrementalLocalKeyedStateHandle(
+                        UUID.randomUUID(),
+                        0,
+                        directoryStateHandle,
+                        new KeyGroupRange(0, 1),
+                        metaDataStateHandle,
+                        Collections.emptyList());
+
+        StateObject.StateObjectSizeStatsCollector stats =
+                StateObject.StateObjectSizeStatsCollector.create();
+        handle.collectSizeStats(stats);
+        Assertions.assertEquals(
+                metaDataBytes + fileOneBytes + fileTwoBytes, 
extractLocalStateSizes(stats));
+    }
+
+    private long 
extractLocalStateSizes(StateObject.StateObjectSizeStatsCollector stats) {
+        EnumMap<StateObject.StateObjectLocation, Long> statsMap = 
stats.getStats();
+        
Assertions.assertFalse(statsMap.containsKey(StateObject.StateObjectLocation.REMOTE));
+        
Assertions.assertFalse(statsMap.containsKey(StateObject.StateObjectLocation.UNKNOWN));
+        // Might be in a byte handle or a file.
+        return statsMap.get(StateObject.StateObjectLocation.LOCAL_DISK)
+                + statsMap.get(StateObject.StateObjectLocation.LOCAL_MEMORY);
+    }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
index 1422bc8bcf0..ad24e1da52f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalRemoteKeyedStateHandleTest.java
@@ -23,10 +23,12 @@ import 
org.apache.flink.runtime.state.IncrementalKeyedStateHandle.HandleAndLocal
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 import org.apache.flink.util.TernaryBoolean;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.nio.charset.StandardCharsets;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Random;
 import java.util.UUID;
@@ -235,6 +237,23 @@ class IncrementalRemoteKeyedStateHandleTest {
         
assertThat(newHandle.getStateHandleId()).isEqualTo(handle.getStateHandleId());
     }
 
+    @Test
+    void testCollectSizeStats() {
+        IncrementalRemoteKeyedStateHandle handle = 
create(ThreadLocalRandom.current());
+        StateObject.StateObjectSizeStatsCollector statsCollector =
+                StateObject.StateObjectSizeStatsCollector.create();
+        handle.collectSizeStats(statsCollector);
+        Assertions.assertEquals(
+                new HashMap<StateObject.StateObjectLocation, Long>() {
+                    {
+                        // Location is LOCAL_MEMORY, even though the handle is 
called remote because
+                        // we test against a local file system
+                        put(StateObject.StateObjectLocation.LOCAL_MEMORY, 
handle.getStateSize());
+                    }
+                },
+                statsCollector.getStats());
+    }
+
     @Test
     void testConcurrentCheckpointSharedStateRegistration() throws Exception {
         String localPath = "1.sst";
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
index c933e7b4b22..0cbe6097da3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupsStateHandleTest.java
@@ -20,8 +20,11 @@ package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
+import java.util.HashMap;
+
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** A test for {@link KeyGroupsStateHandle} */
@@ -52,4 +55,22 @@ class KeyGroupsStateHandleTest {
         KeyGroupRange newRange = new KeyGroupRange(8, 11);
         assertThat(handle.getIntersection(newRange)).isNull();
     }
+
+    @Test
+    void testCollectSizeStats() {
+        final KeyGroupRangeOffsets offsets = new KeyGroupRangeOffsets(0, 7);
+        final byte[] data = new byte[5];
+        final ByteStreamStateHandle innerHandle = new 
ByteStreamStateHandle("name", data);
+        KeyGroupsStateHandle handle = new KeyGroupsStateHandle(offsets, 
innerHandle);
+        StateObject.StateObjectSizeStatsCollector statsCollector =
+                StateObject.StateObjectSizeStatsCollector.create();
+        handle.collectSizeStats(statsCollector);
+        Assertions.assertEquals(
+                new HashMap<StateObject.StateObjectLocation, Long>() {
+                    {
+                        put(StateObject.StateObjectLocation.LOCAL_MEMORY, 
(long) data.length);
+                    }
+                },
+                statsCollector.getStats());
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
index 0a879011baa..346874508c9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FileStateHandleTest.java
@@ -24,10 +24,12 @@ import org.apache.flink.core.fs.FileSystemFactory;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.local.LocalFileSystem;
 import org.apache.flink.core.plugin.TestingPluginManager;
+import org.apache.flink.runtime.state.StateObject;
 import org.apache.flink.util.function.BiFunctionWithException;
 import org.apache.flink.util.function.FunctionWithException;
 import org.apache.flink.util.function.RunnableWithException;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
@@ -152,6 +154,41 @@ class FileStateHandleTest {
                         .build());
     }
 
+    @Test
+    void testCollectSizeStats() throws Exception {
+        final long stateSize = 123L;
+
+        StateObject.StateObjectSizeStatsCollector statsCollector =
+                StateObject.StateObjectSizeStatsCollector.create();
+        FileStateHandle handle =
+                new FileStateHandle(new Path(new 
URI("file:///home/test.txt")), stateSize);
+        handle.collectSizeStats(statsCollector);
+        checkStats(statsCollector, StateObject.StateObjectLocation.LOCAL_DISK, 
stateSize);
+
+        statsCollector = StateObject.StateObjectSizeStatsCollector.create();
+        handle = new FileStateHandle(new Path(new URI("/home/test.txt")), 
stateSize);
+        handle.collectSizeStats(statsCollector);
+        checkStats(statsCollector, StateObject.StateObjectLocation.LOCAL_DISK, 
stateSize);
+
+        statsCollector = StateObject.StateObjectSizeStatsCollector.create();
+        handle = new FileStateHandle(new Path(new 
URI("s3:///folder/test.txt")), stateSize);
+        handle.collectSizeStats(statsCollector);
+        checkStats(statsCollector, StateObject.StateObjectLocation.REMOTE, 
stateSize);
+    }
+
+    private void checkStats(
+            StateObject.StateObjectSizeStatsCollector statsCollector,
+            StateObject.StateObjectLocation expectedLocation,
+            long expectedSize) {
+        Assertions.assertEquals(
+                new HashMap<StateObject.StateObjectLocation, Long>() {
+                    {
+                        put(expectedLocation, expectedSize);
+                    }
+                },
+                statsCollector.getStats());
+    }
+
     private static void testDiscardStateFailed(FileSystem fileSystem) throws 
Exception {
         runInFileSystemContext(
                 fileSystem,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java
index c8df2275215..1ea6fc89847 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/memory/ByteStreamStateHandleTest.java
@@ -19,10 +19,13 @@
 package org.apache.flink.runtime.state.memory;
 
 import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.runtime.state.StateObject;
 
+import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
 import java.io.IOException;
+import java.util.HashMap;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
@@ -143,4 +146,20 @@ class ByteStreamStateHandleTest {
             assertThat(in.read()).isEqualTo(-1); // got -1 because of EOF.
         }
     }
+
+    @Test
+    void testCollectSizeStats() {
+        final byte[] data = new byte[5];
+        final ByteStreamStateHandle handle = new ByteStreamStateHandle("name", 
data);
+        StateObject.StateObjectSizeStatsCollector statsCollector =
+                StateObject.StateObjectSizeStatsCollector.create();
+        handle.collectSizeStats(statsCollector);
+        Assertions.assertEquals(
+                new HashMap<StateObject.StateObjectLocation, Long>() {
+                    {
+                        put(StateObject.StateObjectLocation.LOCAL_MEMORY, 
(long) data.length);
+                    }
+                },
+                statsCollector.getStats());
+    }
 }
diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
index 5f37f84921f..7f890230d74 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/StateHandleDownloadSpec.java
@@ -53,7 +53,7 @@ public class StateHandleDownloadSpec {
         return new IncrementalLocalKeyedStateHandle(
                 stateHandle.getBackendIdentifier(),
                 stateHandle.getCheckpointId(),
-                new DirectoryStateHandle(downloadDestination),
+                new DirectoryStateHandle(downloadDestination, 
stateHandle.getStateSize()),
                 stateHandle.getKeyGroupRange(),
                 stateHandle.getMetaDataStateHandle(),
                 stateHandle.getSharedState());


Reply via email to