This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 32b563dd29c5b08e4574773815c014c1c0cca12e
Author: Yuxin Tan <tanyuxinw...@gmail.com>
AuthorDate: Thu Jun 6 11:48:17 2024 +0800

    [FLINK-35533][runtime] Refactor the APIs and implementations for supporting 
remote tier plugin
---
 .../io/network/NettyShuffleServiceFactory.java     |  2 +-
 .../network/partition/ResultPartitionFactory.java  | 35 ++++++++--
 .../partition/consumer/SingleInputGate.java        |  3 +
 .../partition/consumer/SingleInputGateFactory.java | 29 ++++++++
 .../tiered/common/TieredStorageConfiguration.java  | 48 +------------
 .../hybrid/tiered/common/TieredStorageUtils.java   | 15 ++++
 .../shuffle/TieredInternalShuffleMaster.java       | 69 +++++++++++++++++--
 .../shuffle/TieredResultPartitionFactory.java      | 27 ++++----
 .../storage/TieredStorageConsumerClient.java       | 65 ++++++++++++++++--
 .../tiered/storage/TieredStorageConsumerSpec.java  |  8 +++
 .../tiered/storage/TieredStorageMasterClient.java  | 44 ++++++++++--
 .../tiered/storage/TieredStorageMemoryManager.java |  7 ++
 .../storage/TieredStorageMemoryManagerImpl.java    |  5 ++
 .../hybrid/tiered/tier/NoOpMasterAgent.java        | 24 +++++--
 .../tiered/tier/NoOpTierShuffleDescriptor.java}    | 14 ++--
 .../hybrid/tiered/tier/TierConsumerAgent.java      | 12 +++-
 .../partition/hybrid/tiered/tier/TierFactory.java  | 15 +++-
 .../hybrid/tiered/tier/TierMasterAgent.java        | 25 ++++---
 ...oOpMasterAgent.java => TierShuffleHandler.java} | 28 ++++----
 .../tiered/tier/UnknownTierShuffleDescriptor.java} | 18 ++---
 .../tiered/tier/disk/DiskTierConsumerAgent.java    |  6 ++
 .../hybrid/tiered/tier/disk/DiskTierFactory.java   | 21 ++++++
 .../tiered/tier/disk/DiskTierProducerAgent.java    |  3 +-
 .../tier/memory/MemoryTierConsumerAgent.java       |  6 ++
 .../tiered/tier/memory/MemoryTierFactory.java      | 22 ++++++
 .../tier/memory/MemoryTierProducerAgent.java       |  8 ++-
 .../tier/remote/RemoteTierConsumerAgent.java       |  6 ++
 .../tiered/tier/remote/RemoteTierFactory.java      | 22 ++++++
 .../tiered/tier/remote/RemoteTierMasterAgent.java  | 36 ++++++++--
 .../tier/remote/RemoteTierProducerAgent.java       |  3 +-
 .../flink/runtime/shuffle/NettyShuffleMaster.java  | 23 ++++---
 .../network/partition/ResultPartitionBuilder.java  | 28 ++++++++
 .../tiered/TestingTieredStorageMemoryManager.java  | 18 +++++
 .../tiered/netty/TestingTierConsumerAgent.java     | 20 ++++++
 .../netty/TieredStorageConsumerClientTest.java     |  7 ++
 .../tiered/shuffle/TieredResultPartitionTest.java  | 10 +++
 .../hybrid/tiered/storage/TestingTierFactory.java  | 79 +++++++++++++++++++++-
 .../tier/remote/RemoteTierConsumerAgentTest.java   |  2 +
 .../tier/remote/RemoteTierMasterAgentTest.java     | 10 ++-
 .../adaptivebatch/BatchJobRecoveryTest.java        |  4 +-
 .../flink/runtime/shuffle/ShuffleMasterTest.java   |  2 +-
 .../flink/runtime/shuffle/ShuffleTestUtils.java    |  3 +-
 42 files changed, 681 insertions(+), 151 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
index 7a42b3487d7..f2e65d7317c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java
@@ -68,7 +68,7 @@ public class NettyShuffleServiceFactory
 
     @Override
     public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext 
shuffleMasterContext) {
-        return new NettyShuffleMaster(shuffleMasterContext.getConfiguration());
+        return new NettyShuffleMaster(shuffleMasterContext);
     }
 
     @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 20f33f311ba..fa100455708 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -32,7 +32,11 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartitionFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.ProcessorArchitecture;
@@ -46,6 +50,9 @@ import java.io.IOException;
 import java.util.Optional;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** Factory for {@link ResultPartition} to use in {@link 
NettyShuffleEnvironment}. */
 public class ResultPartitionFactory {
 
@@ -145,9 +152,11 @@ public class ResultPartitionFactory {
                 partitionIndex,
                 desc.getShuffleDescriptor().getResultPartitionID(),
                 desc.getPartitionType(),
+                desc.getTotalNumberOfPartitions(),
                 desc.getNumberOfSubpartitions(),
                 desc.getMaxParallelism(),
                 desc.isBroadcast(),
+                desc.getShuffleDescriptor(),
                 createBufferPoolFactory(desc.getNumberOfSubpartitions(), 
desc.getPartitionType()),
                 desc.isNumberOfPartitionConsumerUndefined());
     }
@@ -158,15 +167,22 @@ public class ResultPartitionFactory {
             int partitionIndex,
             ResultPartitionID id,
             ResultPartitionType type,
+            int numberOfPartitions,
             int numberOfSubpartitions,
             int maxParallelism,
             boolean isBroadcast,
+            ShuffleDescriptor shuffleDescriptor,
             SupplierWithException<BufferPool, IOException> bufferPoolFactory,
             boolean isNumberOfPartitionConsumerUndefined) {
         BufferCompressor bufferCompressor = null;
         if (type.supportCompression() && batchShuffleCompressionEnabled) {
             bufferCompressor = new BufferCompressor(networkBufferSize, 
compressionCodec);
         }
+        if (tieredStorage.isPresent() && type == ResultPartitionType.BLOCKING) 
{
+            LOG.warn(
+                    "When enabling tiered storage, the BLOCKING result 
partition will be replaced as HYBRID_FULL.");
+            type = ResultPartitionType.HYBRID_FULL;
+        }
 
         ResultSubpartition[] subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
 
@@ -241,6 +257,7 @@ public class ResultPartitionFactory {
             }
         } else if (type == ResultPartitionType.HYBRID_FULL
                 || type == ResultPartitionType.HYBRID_SELECTIVE) {
+            checkState(shuffleDescriptor instanceof NettyShuffleDescriptor);
             if (tieredStorage.isPresent()) {
                 partition =
                         tieredStorage
@@ -250,6 +267,7 @@ public class ResultPartitionFactory {
                                         partitionIndex,
                                         id,
                                         type,
+                                        numberOfPartitions,
                                         subpartitions.length,
                                         maxParallelism,
                                         networkBufferSize,
@@ -257,6 +275,9 @@ public class ResultPartitionFactory {
                                         memoryDecouplingEnabled,
                                         partitionManager,
                                         bufferCompressor,
+                                        checkNotNull(
+                                                ((NettyShuffleDescriptor) 
shuffleDescriptor)
+                                                        
.getTierShuffleDescriptors()),
                                         bufferPoolFactory,
                                         channelManager,
                                         batchShuffleReadBufferPool,
@@ -378,10 +399,7 @@ public class ResultPartitionFactory {
                             tieredStorage.isPresent(),
                             memoryDecouplingEnabled,
                             tieredStorage
-                                    .map(
-                                            storage ->
-                                                    
storage.getTieredStorageConfiguration()
-                                                            
.getTotalExclusiveBufferNum())
+                                    
.map(ResultPartitionFactory::getNumTotalGuaranteedBuffers)
                                     .orElse(0),
                             
TieredStorageUtils.getMinBuffersPerResultPartition(),
                             type);
@@ -407,4 +425,13 @@ public class ResultPartitionFactory {
                 return BoundedBlockingSubpartitionType.FILE;
         }
     }
+
+    private static int getNumTotalGuaranteedBuffers(
+            TieredResultPartitionFactory resultPartitionFactory) {
+        return 
resultPartitionFactory.getTieredStorageConfiguration().getTierFactories().stream()
+                .map(TierFactory::getProducerAgentMemorySpec)
+                .map(TieredStorageMemorySpec::getNumGuaranteedBuffers)
+                .mapToInt(Integer::intValue)
+                .sum();
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index a4346d62eb1..53c9d309cba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -312,6 +312,9 @@ public class SingleInputGate extends IndexedInputGate {
 
         BufferPool bufferPool = bufferPoolFactory.get();
         setBufferPool(bufferPool);
+        if (tieredStorageConsumerClient != null) {
+            tieredStorageConsumerClient.setup(bufferPool);
+        }
 
         setupChannels();
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
index 9ab3f367557..72b49b0a6e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java
@@ -42,6 +42,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.UnknownTierShuffleDescriptor;
 import org.apache.flink.runtime.metrics.MetricNames;
 import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.NettyShuffleUtils;
@@ -68,6 +70,7 @@ import java.util.Optional;
 
 import static 
org.apache.flink.runtime.io.network.partition.consumer.InputGateSpecUtils.createGateBuffersSpec;
 import static 
org.apache.flink.runtime.shuffle.ShuffleUtils.applyWithShuffleTypeCheck;
+import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /** Factory for {@link SingleInputGate} to use in {@link 
NettyShuffleEnvironment}. */
 public class SingleInputGateFactory {
@@ -259,6 +262,7 @@ public class SingleInputGateFactory {
 
         int channelIdx = 0;
         final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs = new 
ArrayList<>();
+        List<List<TierShuffleDescriptor>> tierShuffleDescriptors = new 
ArrayList<>();
         for (ShuffleDescriptor descriptor : shuffleDescriptors) {
             TieredStoragePartitionId partitionId =
                     
TieredStorageIdMappingUtils.convertId(descriptor.getResultPartitionID());
@@ -273,8 +277,11 @@ public class SingleInputGateFactory {
                                 channelStatistics,
                                 metrics);
                 if (tieredStorageConfiguration != null) {
+                    addTierShuffleDescriptors(tierShuffleDescriptors, 
descriptor);
+
                     tieredStorageConsumerSpecs.add(
                             new TieredStorageConsumerSpec(
+                                    inputGate.getInputGateIndex(),
                                     partitionId,
                                     new 
TieredStorageInputChannelId(channelIdx),
                                     subpartitionIndexSet));
@@ -292,8 +299,11 @@ public class SingleInputGateFactory {
                                     channelStatistics,
                                     metrics);
                     if (tieredStorageConfiguration != null) {
+                        addTierShuffleDescriptors(tierShuffleDescriptors, 
descriptor);
+
                         tieredStorageConsumerSpecs.add(
                                 new TieredStorageConsumerSpec(
+                                        inputGate.getInputGateIndex(),
                                         partitionId,
                                         new 
TieredStorageInputChannelId(channelIdx),
                                         new 
ResultSubpartitionIndexSet(subpartitionIndex)));
@@ -310,6 +320,7 @@ public class SingleInputGateFactory {
                     new TieredStorageConsumerClient(
                             tieredStorageConfiguration.getTierFactories(),
                             tieredStorageConsumerSpecs,
+                            tierShuffleDescriptors,
                             tieredStorageNettyService);
             inputGate.setTieredStorageService(
                     tieredStorageConsumerSpecs,
@@ -439,6 +450,24 @@ public class SingleInputGateFactory {
         return inputChannelDescriptor.isLocalTo(taskExecutorResourceId);
     }
 
+    private void addTierShuffleDescriptors(
+            List<List<TierShuffleDescriptor>> tierShuffleDescriptors,
+            ShuffleDescriptor descriptor) {
+        if (descriptor instanceof NettyShuffleDescriptor) {
+            tierShuffleDescriptors.add(
+                    ((NettyShuffleDescriptor) 
descriptor).getTierShuffleDescriptors());
+        } else if (descriptor.isUnknown()) {
+            List<TierShuffleDescriptor> unknownDescriptors = new ArrayList<>();
+            int numTiers = 
checkNotNull(tieredStorageConfiguration).getTierFactories().size();
+            for (int i = 0; i < numTiers; i++) {
+                unknownDescriptors.add(UnknownTierShuffleDescriptor.INSTANCE);
+            }
+            tierShuffleDescriptors.add(unknownDescriptors);
+        } else {
+            throw new IllegalArgumentException("Unsupported shuffle descriptor 
type " + descriptor);
+        }
+    }
+
     @VisibleForTesting
     static SupplierWithException<BufferPool, IOException> 
createBufferPoolFactory(
             BufferPoolFactory bufferPoolFactory,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
index cbbe1582177..cb475ae31ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java
@@ -19,70 +19,26 @@
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common;
 
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TierFactoryInitializer;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 
-import java.util.ArrayList;
 import java.util.List;
 
 /** Configurations for the Tiered Storage. */
 public class TieredStorageConfiguration {
 
-    private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100;
-
-    private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1;
-
-    private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1;
-
     private final List<TierFactory> tierFactories;
 
-    private final List<Integer> tierExclusiveBuffers;
-
     private TieredStorageConfiguration(List<TierFactory> tierFactories) {
         this.tierFactories = tierFactories;
-        this.tierExclusiveBuffers = new ArrayList<>();
-    }
-
-    /**
-     * Get the total exclusive buffer number.
-     *
-     * @return the total exclusive buffer number.
-     */
-    public int getTotalExclusiveBufferNum() {
-        return tierExclusiveBuffers.stream().mapToInt(Integer::intValue).sum();
-    }
-
-    /**
-     * Get exclusive buffer number of each tier.
-     *
-     * @return buffer number of each tier.
-     */
-    public List<Integer> getEachTierExclusiveBufferNum() {
-        return tierExclusiveBuffers;
     }
 
     public List<TierFactory> getTierFactories() {
         return tierFactories;
     }
 
-    private void setupTierFactoriesAndExclusiveBuffers(Configuration 
configuration) {
-        String remoteStorageBasePath =
-                configuration.get(
-                        NettyShuffleEnvironmentOptions
-                                
.NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH);
-        tierExclusiveBuffers.add(DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS);
-        tierExclusiveBuffers.add(DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS);
-        if (remoteStorageBasePath != null) {
-            tierExclusiveBuffers.add(DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS);
-        }
-    }
-
     public static TieredStorageConfiguration fromConfiguration(Configuration 
configuration) {
-        TieredStorageConfiguration tieredStorageConfiguration =
-                new TieredStorageConfiguration(
-                        
TierFactoryInitializer.initializeTierFactories(configuration));
-        
tieredStorageConfiguration.setupTierFactoriesAndExclusiveBuffers(configuration);
-        return tieredStorageConfiguration;
+        return new TieredStorageConfiguration(
+                TierFactoryInitializer.initializeTierFactories(configuration));
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
index 9391a1be6a8..8565daf14d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java
@@ -25,6 +25,9 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Buffe
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory;
 
 import java.nio.ByteBuffer;
 import java.util.List;
@@ -94,6 +97,18 @@ public class TieredStorageUtils {
         return DEFAULT_MIN_BUFFERS_PER_RESULT_PARTITION;
     }
 
+    public static String getMemoryTierName() {
+        return MemoryTierFactory.class.getSimpleName();
+    }
+
+    public static String getDiskTierName() {
+        return DiskTierFactory.class.getSimpleName();
+    }
+
+    public static String getRemoteTierName() {
+        return RemoteTierFactory.class.getSimpleName();
+    }
+
     public static ByteBuffer[] generateBufferWithHeaders(
             List<Tuple2<Buffer, Integer>> bufferWithIndexes) {
         ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * 
bufferWithIndexes.size()];
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
index 37b13b95f30..01cc5468692 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java
@@ -18,18 +18,26 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
+import org.apache.flink.runtime.shuffle.JobShuffleContext;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContext;
 
+import java.util.Collection;
 import java.util.List;
+import java.util.concurrent.CompletableFuture;
 import java.util.stream.Collectors;
 
-import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
-
 /**
  * A wrapper internal shuffle master class for tiered storage. All the tiered 
storage operations
  * with the shuffle master should be wrapped in this class.
@@ -38,7 +46,11 @@ public class TieredInternalShuffleMaster {
 
     private final TieredStorageMasterClient tieredStorageMasterClient;
 
-    public TieredInternalShuffleMaster(Configuration conf) {
+    private final ShuffleMasterContext shuffleMasterContext;
+
+    public TieredInternalShuffleMaster(ShuffleMasterContext 
shuffleMasterContext) {
+        this.shuffleMasterContext = shuffleMasterContext;
+        Configuration conf = shuffleMasterContext.getConfiguration();
         TieredStorageConfiguration tieredStorageConfiguration =
                 TieredStorageConfiguration.fromConfiguration(conf);
         TieredStorageResourceRegistry resourceRegistry = new 
TieredStorageResourceRegistry();
@@ -49,11 +61,54 @@ public class TieredInternalShuffleMaster {
         this.tieredStorageMasterClient = new 
TieredStorageMasterClient(tierFactories);
     }
 
-    public void addPartition(ResultPartitionID resultPartitionID) {
-        tieredStorageMasterClient.addPartition(convertId(resultPartitionID));
+    /**
+     * Registers the target job together with the corresponding {@link 
JobShuffleContext} to this
+     * shuffle master.
+     */
+    public void registerJob(JobShuffleContext context) {
+        tieredStorageMasterClient.registerJob(context.getJobId(), 
getTierShuffleHandler(context));
+    }
+
+    /**
+     * Unregisters the target job from this shuffle master, which means the 
corresponding job has
+     * reached a global termination state and all the allocated resources 
except for the cluster
+     * partitions can be cleared.
+     *
+     * @param jobID ID of the target job to be unregistered.
+     */
+    public void unregisterJob(JobID jobID) {
+        tieredStorageMasterClient.unregisterJob(jobID);
+    }
+
+    public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(
+            JobID jobID, ResultPartitionID resultPartitionID) {
+        return tieredStorageMasterClient.addPartitionAndGetShuffleDescriptor(
+                jobID, resultPartitionID);
     }
 
-    public void releasePartition(ResultPartitionID resultPartitionID) {
-        
tieredStorageMasterClient.releasePartition(convertId(resultPartitionID));
+    public void releasePartition(ShuffleDescriptor shuffleDescriptor) {
+        tieredStorageMasterClient.releasePartition(shuffleDescriptor);
+    }
+
+    public void close() {
+        tieredStorageMasterClient.close();
+    }
+
+    private TierShuffleHandler getTierShuffleHandler(JobShuffleContext 
context) {
+        return new TierShuffleHandler() {
+            @Override
+            public CompletableFuture<?> onReleasePartitions(
+                    Collection<TieredStoragePartitionId> partitionIds) {
+                return context.stopTrackingAndReleasePartitions(
+                        partitionIds.stream()
+                                .map(TieredStorageIdMappingUtils::convertId)
+                                .collect(Collectors.toList()));
+            }
+
+            @Override
+            public void onFatalError(Throwable throwable) {
+                shuffleMasterContext.onFatalError(throwable);
+            }
+        };
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
index 7f0ebf125b9..8a788fe9258 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java
@@ -41,6 +41,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Tiere
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory;
 import org.apache.flink.util.function.SupplierWithException;
 
@@ -48,9 +49,12 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** {@link TieredResultPartitionFactory} contains the components to set up 
tiered storage. */
 public class TieredResultPartitionFactory {
 
@@ -84,6 +88,7 @@ public class TieredResultPartitionFactory {
             int partitionIndex,
             ResultPartitionID partitionId,
             ResultPartitionType partitionType,
+            int numPartitions,
             int numSubpartitions,
             int maxParallelism,
             int bufferSizeBytes,
@@ -91,6 +96,7 @@ public class TieredResultPartitionFactory {
             boolean memoryDecouplingEnabled,
             ResultPartitionManager partitionManager,
             @Nullable BufferCompressor bufferCompressor,
+            List<TierShuffleDescriptor> tierShuffleDescriptors,
             SupplierWithException<BufferPool, IOException> bufferPoolFactory,
             FileChannelManager fileChannelManager,
             BatchShuffleReadBufferPool batchShuffleReadBufferPool,
@@ -117,12 +123,14 @@ public class TieredResultPartitionFactory {
         Tuple2<List<TierProducerAgent>, List<TieredStorageMemorySpec>>
                 producerAgentsAndMemorySpecs =
                         createTierProducerAgentsAndMemorySpecs(
+                                numPartitions,
                                 numSubpartitions,
                                 isBroadCastOnly,
                                 
TieredStorageIdMappingUtils.convertId(partitionId),
                                 memoryManager,
                                 bufferAccumulator,
                                 partitionType == 
ResultPartitionType.HYBRID_SELECTIVE,
+                                tierShuffleDescriptors,
                                 fileChannelManager,
                                 batchShuffleReadBufferPool,
                                 batchShuffleReadIOExecutor);
@@ -186,12 +194,14 @@ public class TieredResultPartitionFactory {
 
     private Tuple2<List<TierProducerAgent>, List<TieredStorageMemorySpec>>
             createTierProducerAgentsAndMemorySpecs(
+                    int numberOfPartitions,
                     int numberOfSubpartitions,
                     boolean isBroadcastOnly,
                     TieredStoragePartitionId partitionID,
                     TieredStorageMemoryManager memoryManager,
                     BufferAccumulator bufferAccumulator,
                     boolean isHybridSelective,
+                    List<TierShuffleDescriptor> tierShuffleDescriptors,
                     FileChannelManager fileChannelManager,
                     BatchShuffleReadBufferPool batchShuffleReadBufferPool,
                     ScheduledExecutorService batchShuffleReadIOExecutor) {
@@ -209,10 +219,10 @@ public class TieredResultPartitionFactory {
                                         numberOfSubpartitions + 1,
                                         
TieredStorageUtils.getAccumulatorExclusiveBuffers()),
                         true));
-        List<Integer> tierExclusiveBuffers =
-                tieredStorageConfiguration.getEachTierExclusiveBufferNum();
 
         List<TierFactory> tierFactories = 
tieredStorageConfiguration.getTierFactories();
+        checkState(tierFactories.size() == tierShuffleDescriptors.size());
+
         for (int index = 0; index < tierFactories.size(); ++index) {
             TierFactory tierFactory = tierFactories.get(index);
             if (!isHybridSelective && tierFactory.getClass() == 
MemoryTierFactory.class) {
@@ -220,6 +230,7 @@ public class TieredResultPartitionFactory {
             }
             TierProducerAgent producerAgent =
                     tierFactory.createProducerAgent(
+                            numberOfPartitions,
                             numberOfSubpartitions,
                             partitionID,
                             fileChannelManager.createChannel().getPath(),
@@ -229,20 +240,12 @@ public class TieredResultPartitionFactory {
                             tieredStorageResourceRegistry,
                             batchShuffleReadBufferPool,
                             batchShuffleReadIOExecutor,
+                            
Collections.singletonList(tierShuffleDescriptors.get(index)),
                             Math.max(
                                     2 * 
batchShuffleReadBufferPool.getNumBuffersPerRequest(),
                                     numberOfSubpartitions));
             tierProducerAgents.add(producerAgent);
-
-            if (tierFactory.getClass() == MemoryTierFactory.class) {
-                tieredStorageMemorySpecs.add(
-                        new TieredStorageMemorySpec(
-                                producerAgent, 
tierExclusiveBuffers.get(index), false));
-            } else {
-                tieredStorageMemorySpecs.add(
-                        new TieredStorageMemorySpec(
-                                producerAgent, 
tierExclusiveBuffers.get(index), true));
-            }
+            
tieredStorageMemorySpecs.add(tierFactory.getProducerAgentMemorySpec());
         }
         return Tuple2.of(tierProducerAgents, tieredStorageMemorySpecs);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java
index 4df026347a2..6d9cfff4ebc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java
@@ -20,20 +20,25 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static org.apache.flink.util.Preconditions.checkState;
+
 /** {@link TieredStorageConsumerClient} is used to read buffer from tiered 
store. */
 public class TieredStorageConsumerClient {
 
@@ -56,10 +61,18 @@ public class TieredStorageConsumerClient {
     public TieredStorageConsumerClient(
             List<TierFactory> tierFactories,
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<List<TierShuffleDescriptor>> tierShuffleDescriptors,
             TieredStorageNettyService nettyService) {
         this.tierFactories = tierFactories;
         this.nettyService = nettyService;
-        this.tierConsumerAgents = 
createTierConsumerAgents(tieredStorageConsumerSpecs);
+        this.tierConsumerAgents =
+                createTierConsumerAgents(tieredStorageConsumerSpecs, 
tierShuffleDescriptors);
+    }
+
+    public void setup(BufferPool bufferPool) {
+        TieredStorageMemoryManagerImpl memoryManager = new 
TieredStorageMemoryManagerImpl(0, false);
+        memoryManager.setup(bufferPool, Collections.emptyList());
+        tierConsumerAgents.forEach(tierConsumerAgent -> 
tierConsumerAgent.setup(memoryManager));
     }
 
     public void start() {
@@ -90,7 +103,8 @@ public class TieredStorageConsumerClient {
     }
 
     public Optional<Buffer> getNextBuffer(
-            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId) {
+            TieredStoragePartitionId partitionId, TieredStorageSubpartitionId 
subpartitionId)
+            throws IOException {
         Tuple2<TierConsumerAgent, Integer> currentConsumerAgentAndSegmentId =
                 currentConsumerAgentAndSegmentIds
                         .computeIfAbsent(partitionId, ignore -> new 
HashMap<>())
@@ -148,12 +162,53 @@ public class TieredStorageConsumerClient {
     // 
--------------------------------------------------------------------------------------------
 
     private List<TierConsumerAgent> createTierConsumerAgents(
-            List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) {
+            List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<List<TierShuffleDescriptor>> shuffleDescriptors) {
         ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>();
-        for (TierFactory tierFactory : tierFactories) {
+
+        List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors =
+                transformTierShuffleDescriptors(shuffleDescriptors);
+        // Each tier only requires one inner list of 
transformedTierShuffleDescriptors, so the size
+        // of transformedTierShuffleDescriptors and the size of tierFactories 
are the same.
+        checkState(transformedTierShuffleDescriptors.size() == 
tierFactories.size());
+        for (int i = 0; i < tierFactories.size(); i++) {
             tierConsumerAgents.add(
-                    
tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService));
+                    tierFactories
+                            .get(i)
+                            .createConsumerAgent(
+                                    tieredStorageConsumerSpecs,
+                                    transformedTierShuffleDescriptors.get(i),
+                                    nettyService));
         }
         return tierConsumerAgents;
     }
+
+    /**
+     * Before transforming the shuffle descriptors, the number of tier shuffle 
descriptors is
+     * numPartitions * numTiers (That means shuffleDescriptors.size() is 
numPartitions, while the
+     * shuffleDescriptors.get(0).size() is numTiers). After transforming, the 
number of tier shuffle
+     * descriptors is numTiers * numPartitions (That means 
transformedList.size() is numTiers, while
+     * transformedList.get(0).size() is numPartitions).
+     */
+    private static List<List<TierShuffleDescriptor>> 
transformTierShuffleDescriptors(
+            List<List<TierShuffleDescriptor>> shuffleDescriptors) {
+        int numTiers = 0;
+        int numPartitions = shuffleDescriptors.size();
+        for (List<TierShuffleDescriptor> tierShuffleDescriptors : 
shuffleDescriptors) {
+            if (numTiers == 0) {
+                numTiers = tierShuffleDescriptors.size();
+            }
+            checkState(numTiers == tierShuffleDescriptors.size());
+        }
+
+        List<List<TierShuffleDescriptor>> transformedList = new ArrayList<>();
+        for (int i = 0; i < numTiers; i++) {
+            List<TierShuffleDescriptor> innerList = new ArrayList<>();
+            for (int j = 0; j < numPartitions; j++) {
+                innerList.add(shuffleDescriptors.get(j).get(i));
+            }
+            transformedList.add(innerList);
+        }
+        return transformedList;
+    }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java
index 42b143caaf1..91aaa8bc0c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java
@@ -25,6 +25,8 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 /** Describe the different data sources in {@link 
TieredStorageConsumerClient}. */
 public class TieredStorageConsumerSpec {
 
+    private final int gateIndex;
+
     private final TieredStoragePartitionId tieredStoragePartitionId;
 
     private final TieredStorageInputChannelId tieredStorageInputChannelId;
@@ -32,14 +34,20 @@ public class TieredStorageConsumerSpec {
     private final ResultSubpartitionIndexSet tieredStorageSubpartitionIds;
 
     public TieredStorageConsumerSpec(
+            int gateIndex,
             TieredStoragePartitionId tieredStoragePartitionId,
             TieredStorageInputChannelId tieredStorageInputChannelId,
             ResultSubpartitionIndexSet tieredStorageSubpartitionIds) {
+        this.gateIndex = gateIndex;
         this.tieredStoragePartitionId = tieredStoragePartitionId;
         this.tieredStorageInputChannelId = tieredStorageInputChannelId;
         this.tieredStorageSubpartitionIds = tieredStorageSubpartitionIds;
     }
 
+    public int getGateIndex() {
+        return gateIndex;
+    }
+
     public TieredStoragePartitionId getPartitionId() {
         return tieredStoragePartitionId;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java
index 60f2c779f30..457dcffef25 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java
@@ -18,10 +18,18 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
+import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 
 import java.util.List;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** Client of the Tiered Storage used by the master. */
 public class TieredStorageMasterClient {
@@ -32,11 +40,37 @@ public class TieredStorageMasterClient {
         this.tiers = tiers;
     }
 
-    public void addPartition(TieredStoragePartitionId partitionId) {
-        tiers.forEach(tierMasterAgent -> 
tierMasterAgent.addPartition(partitionId));
+    public void registerJob(JobID jobID, TierShuffleHandler shuffleHandler) {
+        tiers.forEach(tierMasterAgent -> tierMasterAgent.registerJob(jobID, 
shuffleHandler));
+    }
+
+    public void unregisterJob(JobID jobID) {
+        tiers.forEach(tierMasterAgent -> tierMasterAgent.unregisterJob(jobID));
+    }
+
+    public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor(
+            JobID jobID, ResultPartitionID resultPartitionID) {
+        return tiers.stream()
+                .map(
+                        tierMasterAgent ->
+                                
tierMasterAgent.addPartitionAndGetShuffleDescriptor(
+                                        jobID, resultPartitionID))
+                .collect(Collectors.toList());
+    }
+
+    public void releasePartition(ShuffleDescriptor shuffleDescriptor) {
+        checkState(shuffleDescriptor instanceof NettyShuffleDescriptor);
+        List<TierShuffleDescriptor> tierShuffleDescriptors =
+                ((NettyShuffleDescriptor) 
shuffleDescriptor).getTierShuffleDescriptors();
+        if (tierShuffleDescriptors != null && 
!tierShuffleDescriptors.isEmpty()) {
+            checkState(tierShuffleDescriptors.size() == tiers.size());
+            for (int i = 0; i < tierShuffleDescriptors.size(); i++) {
+                tiers.get(i).releasePartition(tierShuffleDescriptors.get(i));
+            }
+        }
     }
 
-    public void releasePartition(TieredStoragePartitionId partitionId) {
-        tiers.forEach(tierMasterAgent -> 
tierMasterAgent.releasePartition(partitionId));
+    public void close() {
+        tiers.forEach(TierMasterAgent::close);
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
index 4ac9c400b27..c2793b72bbb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java
@@ -78,6 +78,13 @@ public interface TieredStorageMemoryManager {
      */
     void listenBufferReclaimRequest(Runnable onBufferReclaimRequest);
 
+    /**
+     * Expose and get the internal {@link BufferPool}. Please note that this 
method is a temporary
+     * workaround for the remote tier plugin and may be removed at any time in 
the future. We
+     * strongly advise that users do not rely on this method.
+     */
+    BufferPool getBufferPool();
+
     /**
      * Request a {@link BufferBuilder} instance for a specific owner. The 
{@link
      * TieredStorageMemoryManagerImpl} will not check whether a buffer can be 
requested. The manager
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
index c7365bc9c41..0ef453ec126 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java
@@ -198,6 +198,11 @@ public class TieredStorageMemoryManagerImpl implements 
TieredStorageMemoryManage
         bufferReclaimRequestListeners.add(onBufferReclaimRequest);
     }
 
+    @Override
+    public BufferPool getBufferPool() {
+        return bufferPool;
+    }
+
     @Override
     public BufferBuilder requestBufferBlocking(Object owner) {
         checkIsInitialized();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
index bf259dee224..bd833d69e45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
@@ -18,22 +18,38 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 /** The empty implementation for {@link TierMasterAgent}. */
 public class NoOpMasterAgent implements TierMasterAgent {
 
     public static final NoOpMasterAgent INSTANCE = new NoOpMasterAgent();
 
-    private NoOpMasterAgent() {}
+    @Override
+    public void registerJob(JobID jobID, TierShuffleHandler 
tierShuffleHandler) {
+        // noop
+    }
+
+    @Override
+    public void unregisterJob(JobID jobID) {
+        // noop
+    }
+
+    @Override
+    public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
+            JobID jobID, ResultPartitionID resultPartitionID) {
+        // noop
+        return NoOpTierShuffleDescriptor.INSTANCE;
+    }
 
     @Override
-    public void addPartition(TieredStoragePartitionId partitionId) {
+    public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
         // noop
     }
 
     @Override
-    public void releasePartition(TieredStoragePartitionId partitionId) {
+    public void close() {
         // noop
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpTierShuffleDescriptor.java
similarity index 66%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpTierShuffleDescriptor.java
index 4ada32cef0a..1c1cb67f78e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpTierShuffleDescriptor.java
@@ -16,16 +16,14 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.shuffle;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import org.apache.flink.configuration.Configuration;
+/** The empty implementation for {@link TierShuffleDescriptor}. */
+public class NoOpTierShuffleDescriptor implements TierShuffleDescriptor {
 
-/** Utils for shuffle related tests. */
-public class ShuffleTestUtils {
+    private static final long serialVersionUID = 1L;
 
-    public static final ShuffleMaster<?> DEFAULT_SHUFFLE_MASTER =
-            new NettyShuffleMaster(new Configuration());
+    public static final NoOpTierShuffleDescriptor INSTANCE = new 
NoOpTierShuffleDescriptor();
 
-    /** Private default constructor to avoid being instantiated. */
-    private ShuffleTestUtils() {}
+    private NoOpTierShuffleDescriptor() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
index fecf9baa9e1..9a845c7ddfb 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 
 import java.io.IOException;
 import java.util.Optional;
@@ -33,6 +34,14 @@ import java.util.Optional;
  */
 public interface TierConsumerAgent {
 
+    /**
+     * The consumer agent may request buffers from the memory manager. 
Therefore, the {@link
+     * TieredStorageMemoryManager} should be integrated into the tier consumer 
agent. Since the
+     * buffer pool is initialized after the creation of the client, the memory 
manager need to be
+     * assigned after the buffer pool becomes available.
+     */
+    void setup(TieredStorageMemoryManager memoryManager);
+
     /** Start the consumer agent. */
     void start();
 
@@ -59,7 +68,8 @@ public interface TierConsumerAgent {
     Optional<Buffer> getNextBuffer(
             TieredStoragePartitionId partitionId,
             TieredStorageSubpartitionId subpartitionId,
-            int segmentId);
+            int segmentId)
+            throws IOException;
 
     /**
      * Register the notifier to notify the availability of a subpartition.
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
index 9f1ad6e33c7..fbfce81f2ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java
@@ -24,6 +24,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 
 import java.util.List;
@@ -35,11 +36,21 @@ public interface TierFactory {
     /** Sets up the tier factory based on the {@link Configuration}. */
     void setup(Configuration configuration);
 
+    /** Get the {@link TieredStorageMemorySpec} of the master-side agent. */
+    TieredStorageMemorySpec getMasterAgentMemorySpec();
+
+    /** Get the {@link TieredStorageMemorySpec} of the producer-side agent. */
+    TieredStorageMemorySpec getProducerAgentMemorySpec();
+
+    /** Get the {@link TieredStorageMemorySpec} of the consumer-side agent. */
+    TieredStorageMemorySpec getConsumerAgentMemorySpec();
+
     /** Creates the master-side agent of a Tier. */
     TierMasterAgent createMasterAgent(TieredStorageResourceRegistry 
tieredStorageResourceRegistry);
 
     /** Creates the producer-side agent of a Tier. */
     TierProducerAgent createProducerAgent(
+            int numPartitions,
             int numSubpartitions,
             TieredStoragePartitionId partitionID,
             String dataFileBasePath,
@@ -49,10 +60,12 @@ public interface TierFactory {
             TieredStorageResourceRegistry resourceRegistry,
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
-            int maxRequestedBuffers);
+            List<TierShuffleDescriptor> shuffleDescriptors,
+            int maxRequestedBuffer);
 
     /** Creates the consumer-side agent of a Tier. */
     TierConsumerAgent createConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             TieredStorageNettyService nettyService);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
index 2c34cdd38c6..e0d20d24082 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java
@@ -18,22 +18,29 @@
 
 package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 
 /** The master-side agent of a Tier. */
 public interface TierMasterAgent {
 
-    /**
-     * Add a new tiered storage partition.
-     *
-     * @param partitionId the identifier of the new partition
-     */
-    void addPartition(TieredStoragePartitionId partitionId);
+    /** Register a job id with a {@link TierShuffleHandler}. */
+    void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler);
+
+    /** Unregister a job id. */
+    void unregisterJob(JobID jobID);
+
+    /** Add a new tiered storage partition and get the {@link 
TierShuffleDescriptor}. */
+    TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
+            JobID jobID, ResultPartitionID resultPartitionID);
 
     /**
      * Release a tiered storage partition.
      *
-     * @param partitionId the identifier of partition to be released
+     * @param shuffleDescriptor the partition shuffle descriptor to be released
      */
-    void releasePartition(TieredStoragePartitionId partitionId);
+    void releasePartition(TierShuffleDescriptor shuffleDescriptor);
+
+    /** Close this tier master agent. */
+    void close();
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierShuffleHandler.java
similarity index 55%
copy from 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierShuffleHandler.java
index bf259dee224..93d283959c1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierShuffleHandler.java
@@ -20,20 +20,22 @@ package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 
-/** The empty implementation for {@link TierMasterAgent}. */
-public class NoOpMasterAgent implements TierMasterAgent {
+import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 
-    public static final NoOpMasterAgent INSTANCE = new NoOpMasterAgent();
-
-    private NoOpMasterAgent() {}
+/**
+ * A handler to process the call back result from each tier, the callbacks can 
be some events, some
+ * errors, some exceptions, etc. If you want to process some new callbacks, 
you can add more methods
+ * to this handler interface.
+ *
+ * <p>When the tier happens some events, the tier will call these methods, 
then the framework can
+ * process the events.
+ */
+public interface TierShuffleHandler {
 
-    @Override
-    public void addPartition(TieredStoragePartitionId partitionId) {
-        // noop
-    }
+    /** A callback to process the event of releasing a collection of tiered 
result partitions. */
+    CompletableFuture<?> 
onReleasePartitions(Collection<TieredStoragePartitionId> partitionIds);
 
-    @Override
-    public void releasePartition(TieredStoragePartitionId partitionId) {
-        // noop
-    }
+    /** A callback to process the fatal error (if the error exists). */
+    void onFatalError(Throwable throwable);
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/UnknownTierShuffleDescriptor.java
similarity index 62%
copy from 
flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
copy to 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/UnknownTierShuffleDescriptor.java
index 4ada32cef0a..b09c71855d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/UnknownTierShuffleDescriptor.java
@@ -16,16 +16,16 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.shuffle;
+package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier;
 
-import org.apache.flink.configuration.Configuration;
-
-/** Utils for shuffle related tests. */
-public class ShuffleTestUtils {
+/**
+ * This is a placeholder when the tier shuffle descriptor should be unknown 
because the netty
+ * shuffle descriptor is unknown.
+ */
+public class UnknownTierShuffleDescriptor implements TierShuffleDescriptor {
+    private static final long serialVersionUID = 1L;
 
-    public static final ShuffleMaster<?> DEFAULT_SHUFFLE_MASTER =
-            new NettyShuffleMaster(new Configuration());
+    public static final UnknownTierShuffleDescriptor INSTANCE = new 
UnknownTierShuffleDescriptor();
 
-    /** Private default constructor to avoid being instantiated. */
-    private ShuffleTestUtils() {}
+    private UnknownTierShuffleDescriptor() {}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java
index b48ec280aa9..b97323fe457 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyCo
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 
 import java.io.IOException;
@@ -61,6 +62,11 @@ public class DiskTierConsumerAgent implements 
TierConsumerAgent {
         }
     }
 
+    @Override
+    public void setup(TieredStorageMemoryManager memoryManager) {
+        // noop
+    }
+
     @Override
     public void start() {
         // noop
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
index 9d59a8ca908..786aab43c5f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java
@@ -28,12 +28,14 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.Producer
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.NoOpMasterAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import java.nio.file.Path;
@@ -42,6 +44,7 @@ import java.time.Duration;
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.DATA_FILE_SUFFIX;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.INDEX_FILE_SUFFIX;
 import static org.apache.flink.util.Preconditions.checkState;
@@ -70,6 +73,21 @@ public class DiskTierFactory implements TierFactory {
         this.bufferSizeBytes = 
ConfigurationParserUtils.getPageSize(configuration);
     }
 
+    @Override
+    public TieredStorageMemorySpec getMasterAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getDiskTierName(), 0);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getProducerAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getDiskTierName(), 
DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getDiskTierName(), 0);
+    }
+
     @Override
     public TierMasterAgent createMasterAgent(TieredStorageResourceRegistry 
resourceRegistry) {
         return NoOpMasterAgent.INSTANCE;
@@ -77,6 +95,7 @@ public class DiskTierFactory implements TierFactory {
 
     @Override
     public TierProducerAgent createProducerAgent(
+            int numPartitions,
             int numSubpartitions,
             TieredStoragePartitionId partitionId,
             String dataFileBasePath,
@@ -86,6 +105,7 @@ public class DiskTierFactory implements TierFactory {
             TieredStorageResourceRegistry resourceRegistry,
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             int maxRequestedBuffers) {
         checkState(bufferSizeBytes > 0);
 
@@ -125,6 +145,7 @@ public class DiskTierFactory implements TierFactory {
     @Override
     public TierConsumerAgent createConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             TieredStorageNettyService nettyService) {
         return new DiskTierConsumerAgent(tieredStorageConsumerSpecs, 
nettyService);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
index 4f88ca71bfa..e8d35ea6e39 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java
@@ -48,6 +48,7 @@ import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** The disk tier implementation of {@link TierProducerAgent}. */
@@ -177,7 +178,7 @@ public class DiskTierProducerAgent implements 
TierProducerAgent, NettyServicePro
             return false;
         }
         if (finishedBuffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, this, 
finishedBuffer);
+            memoryManager.transferBufferOwnership(bufferOwner, 
getDiskTierName(), finishedBuffer);
         }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
         emitBuffer(finishedBuffer, subpartitionIndex, 
numRemainingConsecutiveBuffers == 0);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java
index 261742d6228..138ddaa4326 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyCo
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 
 import java.io.IOException;
@@ -61,6 +62,11 @@ public class MemoryTierConsumerAgent implements 
TierConsumerAgent {
         }
     }
 
+    @Override
+    public void setup(TieredStorageMemoryManager memoryManager) {
+        // noop
+    }
+
     @Override
     public void start() {
         // noop
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
index dfd762582cb..b2d84acb364 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java
@@ -24,17 +24,20 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.NoOpMasterAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
 import static org.apache.flink.util.Preconditions.checkState;
 
 /** The implementation of {@link TierFactory} for memory tier. */
@@ -53,6 +56,22 @@ public class MemoryTierFactory implements TierFactory {
         this.bufferSizeBytes = 
ConfigurationParserUtils.getPageSize(configuration);
     }
 
+    @Override
+    public TieredStorageMemorySpec getMasterAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getMemoryTierName(), 0);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getProducerAgentMemorySpec() {
+        return new TieredStorageMemorySpec(
+                getMemoryTierName(), DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getMemoryTierName(), 0);
+    }
+
     @Override
     public TierMasterAgent createMasterAgent(
             TieredStorageResourceRegistry tieredStorageResourceRegistry) {
@@ -61,6 +80,7 @@ public class MemoryTierFactory implements TierFactory {
 
     @Override
     public TierProducerAgent createProducerAgent(
+            int numPartitions,
             int numSubpartitions,
             TieredStoragePartitionId partitionID,
             String dataFileBasePath,
@@ -70,6 +90,7 @@ public class MemoryTierFactory implements TierFactory {
             TieredStorageResourceRegistry resourceRegistry,
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             int maxRequestedBuffers) {
         checkState(bufferSizeBytes > 0);
 
@@ -88,6 +109,7 @@ public class MemoryTierFactory implements TierFactory {
     @Override
     public TierConsumerAgent createConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             TieredStorageNettyService nettyService) {
         return new MemoryTierConsumerAgent(tieredStorageConsumerSpecs, 
nettyService);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
index 1dc592c021b..973b8cdf741 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java
@@ -40,6 +40,7 @@ import java.io.IOException;
 import java.util.Arrays;
 
 import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT;
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** The memory tier implementation of {@link TierProducerAgent}. */
@@ -108,8 +109,9 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
                         && 
subpartitionProducerAgents[subpartitionId.getSubpartitionId()]
                                         .numQueuedBuffers()
                                 < subpartitionMaxQueuedBuffers
-                        && (memoryManager.getMaxNonReclaimableBuffers(this)
-                                        - 
memoryManager.numOwnerRequestedBuffer(this))
+                        && 
(memoryManager.getMaxNonReclaimableBuffers(getMemoryTierName())
+                                        - 
memoryManager.numOwnerRequestedBuffer(
+                                                getMemoryTierName()))
                                 > Math.max(numBuffersPerSegment, minNumBuffers)
                         && memoryManager.ensureCapacity(
                                 Math.max(numBuffersPerSegment, minNumBuffers));
@@ -137,7 +139,7 @@ public class MemoryTierProducerAgent implements 
TierProducerAgent, NettyServiceP
             return false;
         }
         if (finishedBuffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, this, 
finishedBuffer);
+            memoryManager.transferBufferOwnership(bufferOwner, 
getMemoryTierName(), finishedBuffer);
         }
         currentSubpartitionWriteBuffers[subpartitionIndex]++;
         addFinishedBuffer(finishedBuffer, subpartitionIndex);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
index 07b19cf3b4c..07c3d29eae2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java
@@ -31,6 +31,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
@@ -100,6 +101,11 @@ public class RemoteTierConsumerAgent implements 
TierConsumerAgent {
         }
     }
 
+    @Override
+    public void setup(TieredStorageMemoryManager memoryManager) {
+        // noop
+    }
+
     @Override
     public void start() {
         remoteStorageScanner.start();
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
index 5ab7e89e5b3..5c0f72ecea5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java
@@ -28,16 +28,19 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentP
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
 
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 import static org.apache.flink.util.Preconditions.checkState;
 
@@ -64,6 +67,22 @@ public class RemoteTierFactory implements TierFactory {
                                         
.NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH));
     }
 
+    @Override
+    public TieredStorageMemorySpec getMasterAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getRemoteTierName(), 0);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getProducerAgentMemorySpec() {
+        return new TieredStorageMemorySpec(
+                getRemoteTierName(), DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
+        return new TieredStorageMemorySpec(getRemoteTierName(), 0);
+    }
+
     @Override
     public TierMasterAgent createMasterAgent(TieredStorageResourceRegistry 
resourceRegistry) {
         return new RemoteTierMasterAgent(remoteStoragePath, resourceRegistry);
@@ -71,6 +90,7 @@ public class RemoteTierFactory implements TierFactory {
 
     @Override
     public TierProducerAgent createProducerAgent(
+            int numPartitions,
             int numSubpartitions,
             TieredStoragePartitionId partitionID,
             String dataFileBasePath,
@@ -80,6 +100,7 @@ public class RemoteTierFactory implements TierFactory {
             TieredStorageResourceRegistry resourceRegistry,
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             int maxRequestedBuffers) {
         checkState(bufferSizeBytes > 0);
         checkNotNull(remoteStoragePath);
@@ -100,6 +121,7 @@ public class RemoteTierFactory implements TierFactory {
     @Override
     public TierConsumerAgent createConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             TieredStorageNettyService nettyService) {
         PartitionFileReader partitionFileReader =
                 
SegmentPartitionFile.createPartitionFileReader(remoteStoragePath);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java
index a41fc2695c1..924993b875a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java
@@ -18,12 +18,18 @@
 
 package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
 
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.deletePathQuietly;
 import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getPartitionPath;
+import static org.apache.flink.util.Preconditions.checkState;
 
 /** The implementation of {@link TierMasterAgent} for the remote tier. */
 public class RemoteTierMasterAgent implements TierMasterAgent {
@@ -39,14 +45,34 @@ public class RemoteTierMasterAgent implements 
TierMasterAgent {
     }
 
     @Override
-    public void addPartition(TieredStoragePartitionId partitionID) {
+    public void registerJob(JobID jobID, TierShuffleHandler 
tierShuffleHandler) {
+        // noop
+    }
+
+    @Override
+    public void unregisterJob(JobID jobID) {
+        // noop
+    }
+
+    @Override
+    public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor(
+            JobID jobID, ResultPartitionID resultPartitionID) {
+        TieredStoragePartitionId partitionId = convertId(resultPartitionID);
         resourceRegistry.registerResource(
-                partitionID,
-                () -> deletePathQuietly(getPartitionPath(partitionID, 
remoteStorageBasePath)));
+                partitionId,
+                () -> deletePathQuietly(getPartitionPath(partitionId, 
remoteStorageBasePath)));
+        return new RemoteTierShuffleDescriptor(partitionId);
+    }
+
+    @Override
+    public void releasePartition(TierShuffleDescriptor shuffleDescriptor) {
+        checkState(shuffleDescriptor instanceof RemoteTierShuffleDescriptor);
+        resourceRegistry.clearResourceFor(
+                ((RemoteTierShuffleDescriptor) 
shuffleDescriptor).getPartitionId());
     }
 
     @Override
-    public void releasePartition(TieredStoragePartitionId partitionID) {
-        resourceRegistry.clearResourceFor(partitionID);
+    public void close() {
+        // noop
     }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
index 2a362721368..8444c29fec9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java
@@ -28,6 +28,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd
 
 import java.util.Arrays;
 
+import static 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName;
 import static org.apache.flink.util.Preconditions.checkArgument;
 
 /** The implementation of {@link TierProducerAgent} for the remote tier. */
@@ -95,7 +96,7 @@ public class RemoteTierProducerAgent implements 
TierProducerAgent {
             return false;
         }
         if (buffer.isBuffer()) {
-            memoryManager.transferBufferOwnership(bufferOwner, this, buffer);
+            memoryManager.transferBufferOwnership(bufferOwner, 
getRemoteTierName(), buffer);
         }
         currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++;
         cacheDataManager.appendBuffer(buffer, subpartitionIndex);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index df544fdce47..31bed319e3a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.NettyShuffleEnvironmentOptions;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo;
@@ -34,6 +35,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
@@ -65,7 +67,8 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
 
     private final Map<JobID, JobShuffleContext> jobShuffleContexts = new 
HashMap<>();
 
-    public NettyShuffleMaster(Configuration conf) {
+    public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) {
+        Configuration conf = shuffleMasterContext.getConfiguration();
         checkNotNull(conf);
         buffersPerInputChannel =
                 
conf.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL);
@@ -81,7 +84,7 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
         networkBufferSize = ConfigurationParserUtils.getPageSize(conf);
 
         if (isHybridShuffleNewModeEnabled(conf)) {
-            tieredInternalShuffleMaster = new 
TieredInternalShuffleMaster(conf);
+            tieredInternalShuffleMaster = new 
TieredInternalShuffleMaster(shuffleMasterContext);
         } else {
             tieredInternalShuffleMaster = null;
         }
@@ -110,23 +113,27 @@ public class NettyShuffleMaster implements 
ShuffleMaster<NettyShuffleDescriptor>
                         partitionDescriptor.getPartitionId(),
                         producerDescriptor.getProducerExecutionId());
 
+        List<TierShuffleDescriptor> tierShuffleDescriptors = null;
+        if (tieredInternalShuffleMaster != null) {
+            tierShuffleDescriptors =
+                    
tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor(
+                            jobID, resultPartitionID);
+        }
+
         NettyShuffleDescriptor shuffleDeploymentDescriptor =
                 new NettyShuffleDescriptor(
                         producerDescriptor.getProducerLocation(),
                         createConnectionInfo(
                                 producerDescriptor, 
partitionDescriptor.getConnectionIndex()),
-                        resultPartitionID);
-
-        if (tieredInternalShuffleMaster != null) {
-            tieredInternalShuffleMaster.addPartition(resultPartitionID);
-        }
+                        resultPartitionID,
+                        tierShuffleDescriptors);
         return CompletableFuture.completedFuture(shuffleDeploymentDescriptor);
     }
 
     @Override
     public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
         if (tieredInternalShuffleMaster != null) {
-            
tieredInternalShuffleMaster.releasePartition(shuffleDescriptor.getResultPartitionID());
+            tieredInternalShuffleMaster.releasePartition(shuffleDescriptor);
         }
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 6530bf8f178..88e5f64dcdf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -19,12 +19,14 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import 
org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.disk.NoOpFileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.util.function.SupplierWithException;
 
 import java.io.IOException;
@@ -44,6 +46,8 @@ public class ResultPartitionBuilder {
 
     private int partitionIndex = 0;
 
+    private int numberOfPartitions = 1;
+
     private int numberOfSubpartitions = 1;
 
     private int numTargetKeyGroups = 1;
@@ -278,10 +282,34 @@ public class ResultPartitionBuilder {
                 partitionIndex,
                 partitionId,
                 partitionType,
+                numberOfPartitions,
                 numberOfSubpartitions,
                 numTargetKeyGroups,
                 isBroadcast,
+                new TestingShuffleDescriptor(partitionId, new 
ResourceID("test")),
                 factory,
                 false);
     }
+
+    private static class TestingShuffleDescriptor implements ShuffleDescriptor 
{
+
+        private final ResultPartitionID resultPartitionId;
+
+        private final ResourceID location;
+
+        TestingShuffleDescriptor(ResultPartitionID resultPartitionId, 
ResourceID location) {
+            this.resultPartitionId = resultPartitionId;
+            this.location = location;
+        }
+
+        @Override
+        public ResultPartitionID getResultPartitionID() {
+            return resultPartitionId;
+        }
+
+        @Override
+        public Optional<ResourceID> storesLocalResourcesOn() {
+            return Optional.of(location);
+        }
+    }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
index 761d731579b..649346ca8e5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java
@@ -39,6 +39,8 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
 
     private final Consumer<TaskIOMetricGroup> setMetricGroupConsumer;
 
+    private final Supplier<BufferPool> bufferPoolSupplier;
+
     private final Consumer<Runnable> listenBufferReclaimRequestConsumer;
 
     private final Function<Object, BufferBuilder> 
requestBufferBlockingFunction;
@@ -58,6 +60,7 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
     private TestingTieredStorageMemoryManager(
             BiConsumer<BufferPool, List<TieredStorageMemorySpec>> 
setupConsumer,
             Consumer<TaskIOMetricGroup> setMetricGroupConsumer,
+            Supplier<BufferPool> bufferPoolSupplier,
             Consumer<Runnable> listenBufferReclaimRequestConsumer,
             Function<Object, BufferBuilder> requestBufferBlockingFunction,
             Function<Object, Integer> getMaxNonReclaimableBuffersFunction,
@@ -68,6 +71,7 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
             Runnable releaseRunnable) {
         this.setupConsumer = setupConsumer;
         this.setMetricGroupConsumer = setMetricGroupConsumer;
+        this.bufferPoolSupplier = bufferPoolSupplier;
         this.listenBufferReclaimRequestConsumer = 
listenBufferReclaimRequestConsumer;
         this.requestBufferBlockingFunction = requestBufferBlockingFunction;
         this.getMaxNonReclaimableBuffersFunction = 
getMaxNonReclaimableBuffersFunction;
@@ -88,6 +92,11 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
         setMetricGroupConsumer.accept(metricGroup);
     }
 
+    @Override
+    public BufferPool getBufferPool() {
+        return bufferPoolSupplier.get();
+    }
+
     @Override
     public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) {
         listenBufferReclaimRequestConsumer.accept(onBufferReclaimRequest);
@@ -136,6 +145,8 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
 
         private Consumer<TaskIOMetricGroup> setMetricGroupConsumer = (ignore) 
-> {};
 
+        private Supplier<BufferPool> bufferPoolSupplier = () -> null;
+
         private Consumer<Runnable> listenBufferReclaimRequestConsumer = 
runnable -> {};
 
         private Function<Object, BufferBuilder> requestBufferFunction = owner 
-> null;
@@ -163,6 +174,12 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
             return this;
         }
 
+        public TestingTieredStorageMemoryManager.Builder setBufferPoolSupplier(
+                Supplier<BufferPool> bufferPoolSupplier) {
+            this.bufferPoolSupplier = bufferPoolSupplier;
+            return this;
+        }
+
         public TestingTieredStorageMemoryManager.Builder 
setListenBufferReclaimRequestConsumer(
                 Consumer<Runnable> listenBufferReclaimRequestConsumer) {
             this.listenBufferReclaimRequestConsumer = 
listenBufferReclaimRequestConsumer;
@@ -215,6 +232,7 @@ public class TestingTieredStorageMemoryManager implements 
TieredStorageMemoryMan
             return new TestingTieredStorageMemoryManager(
                     setupConsumer,
                     setMetricGroupConsumer,
+                    bufferPoolSupplier,
                     listenBufferReclaimRequestConsumer,
                     requestBufferBlockingFunction,
                     getMaxNonReclaimableBuffersFunction,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
index f0f2f69168c..da1525be42f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java
@@ -23,16 +23,20 @@ import 
org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
 
 import java.io.IOException;
 import java.util.Optional;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /** Test implementation for {@link TierConsumerAgent}. */
 public class TestingTierConsumerAgent implements TierConsumerAgent {
 
+    private final Consumer<TieredStorageMemoryManager> memoryManagerConsumer;
+
     private final Runnable startNotifier;
 
     private final Supplier<Buffer> bufferSupplier;
@@ -46,18 +50,25 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
 
     private TestingTierConsumerAgent(
             Runnable startNotifier,
+            Consumer<TieredStorageMemoryManager> memoryManagerConsumer,
             Supplier<Buffer> bufferSupplier,
             Runnable availabilityNotifierRegistrationRunnable,
             Runnable closeNotifier,
             BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, 
Integer>
                     peekNextBufferSubpartitionIdFunction) {
         this.startNotifier = startNotifier;
+        this.memoryManagerConsumer = memoryManagerConsumer;
         this.bufferSupplier = bufferSupplier;
         this.availabilityNotifierRegistrationRunnable = 
availabilityNotifierRegistrationRunnable;
         this.closeNotifier = closeNotifier;
         this.peekNextBufferSubpartitionIdFunction = 
peekNextBufferSubpartitionIdFunction;
     }
 
+    @Override
+    public void setup(TieredStorageMemoryManager memoryManager) {
+        memoryManagerConsumer.accept(memoryManager);
+    }
+
     @Override
     public void start() {
         startNotifier.run();
@@ -94,6 +105,8 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
 
         private Runnable startNotifier = () -> {};
 
+        private Consumer<TieredStorageMemoryManager> memoryManagerConsumer = 
memoryManager -> {};
+
         private Supplier<Buffer> bufferSupplier = () -> null;
 
         private Runnable availabilityNotifierRegistrationRunnable = () -> {};
@@ -110,6 +123,12 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
             return this;
         }
 
+        public Builder setMemoryManagerConsumer(
+                Consumer<TieredStorageMemoryManager> memoryManagerConsumer) {
+            this.memoryManagerConsumer = memoryManagerConsumer;
+            return this;
+        }
+
         public Builder setBufferSupplier(Supplier<Buffer> bufferSupplier) {
             this.bufferSupplier = bufferSupplier;
             return this;
@@ -137,6 +156,7 @@ public class TestingTierConsumerAgent implements 
TierConsumerAgent {
         public TestingTierConsumerAgent build() {
             return new TestingTierConsumerAgent(
                     startNotifier,
+                    memoryManagerConsumer,
                     bufferSupplier,
                     availabilityNotifierRegistrationRunnable,
                     closeNotifier,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java
index bcece554d42..fca5892c5b3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Testi
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.TestingAvailabilityNotifier;
 
 import org.junit.jupiter.api.Test;
@@ -108,6 +109,10 @@ class TieredStorageConsumerClientTest {
 
     private TieredStorageConsumerClient createTieredStorageConsumerClient(
             TierConsumerAgent tierConsumerAgent) {
+        TierShuffleDescriptor emptyTierShuffleDescriptor =
+                new TierShuffleDescriptor() {
+                    private static final long serialVersionUID = 1L;
+                };
         return new TieredStorageConsumerClient(
                 Collections.singletonList(
                         new TestingTierFactory.Builder()
@@ -117,9 +122,11 @@ class TieredStorageConsumerClientTest {
                                 .build()),
                 Collections.singletonList(
                         new TieredStorageConsumerSpec(
+                                0,
                                 DEFAULT_PARTITION_ID,
                                 DEFAULT_INPUT_CHANNEL_ID,
                                 DEFAULT_SUBPARTITION_ID_SET)),
+                
Collections.singletonList(Collections.singletonList(emptyTierShuffleDescriptor)),
                 new TestingTieredStorageNettyService.Builder().build());
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
index 8bec167625a..e2d771a5c72 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java
@@ -42,8 +42,10 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.util.NoOpTierShuffleDescriptor;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 import org.apache.flink.util.concurrent.IgnoreShutdownRejectedExecutionHandler;
 
@@ -54,7 +56,9 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
 
@@ -277,6 +281,10 @@ class TieredResultPartitionTest {
                         tieredStorageNettyService,
                         tieredStorageResourceRegistry);
 
+        List<TierShuffleDescriptor> tierShuffleDescriptors =
+                Arrays.asList(
+                        NoOpTierShuffleDescriptor.INSTANCE, 
NoOpTierShuffleDescriptor.INSTANCE);
+
         TieredResultPartition resultPartition =
                 tieredResultPartitionFactory.createTieredResultPartition(
                         "TieredStoreResultPartitionTest",
@@ -285,11 +293,13 @@ class TieredResultPartitionTest {
                         ResultPartitionType.HYBRID_SELECTIVE,
                         numSubpartitions,
                         numSubpartitions,
+                        Integer.MAX_VALUE,
                         NETWORK_BUFFER_SIZE,
                         isBroadcastOnly,
                         true,
                         new ResultPartitionManager(),
                         new BufferCompressor(NETWORK_BUFFER_SIZE, 
CompressionCodec.LZ4),
+                        tierShuffleDescriptors,
                         () -> bufferPool,
                         fileChannelManager,
                         readBufferPool,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
index 593b3698451..041ac33c480 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java
@@ -26,15 +26,25 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierCons
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 
 import java.util.List;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.BiFunction;
+import java.util.function.Consumer;
 import java.util.function.Supplier;
 
 /** Test implementation for {@link TierFactory}. */
 public class TestingTierFactory implements TierFactory {
 
+    private Consumer<Configuration> setupConsumer;
+
+    private Supplier<TieredStorageMemorySpec> masterAgentMemorySpecSupplier;
+
+    private Supplier<TieredStorageMemorySpec> producerAgentMemorySpecSupplier;
+
+    private Supplier<TieredStorageMemorySpec> consumerAgentMemorySpecSupplier;
+
     private Supplier<TierMasterAgent> tierMasterAgentSupplier;
 
     private Supplier<TierProducerAgent> tierProducerAgentSupplier;
@@ -44,6 +54,10 @@ public class TestingTierFactory implements TierFactory {
             tierConsumerAgentSupplier;
 
     private TestingTierFactory(
+            Consumer<Configuration> setupConsumer,
+            Supplier<TieredStorageMemorySpec> masterAgentMemorySpecSupplier,
+            Supplier<TieredStorageMemorySpec> producerAgentMemorySpecSupplier,
+            Supplier<TieredStorageMemorySpec> consumerAgentMemorySpecSupplier,
             Supplier<TierMasterAgent> tierMasterAgentSupplier,
             Supplier<TierProducerAgent> tierProducerAgentSupplier,
             BiFunction<
@@ -51,6 +65,10 @@ public class TestingTierFactory implements TierFactory {
                             TieredStorageNettyService,
                             TierConsumerAgent>
                     tierConsumerAgentSupplier) {
+        this.setupConsumer = setupConsumer;
+        this.masterAgentMemorySpecSupplier = masterAgentMemorySpecSupplier;
+        this.producerAgentMemorySpecSupplier = producerAgentMemorySpecSupplier;
+        this.consumerAgentMemorySpecSupplier = consumerAgentMemorySpecSupplier;
         this.tierMasterAgentSupplier = tierMasterAgentSupplier;
         this.tierProducerAgentSupplier = tierProducerAgentSupplier;
         this.tierConsumerAgentSupplier = tierConsumerAgentSupplier;
@@ -59,7 +77,24 @@ public class TestingTierFactory implements TierFactory {
     public TestingTierFactory() {}
 
     @Override
-    public void setup(Configuration configuration) {}
+    public void setup(Configuration configuration) {
+        setupConsumer.accept(configuration);
+    }
+
+    @Override
+    public TieredStorageMemorySpec getMasterAgentMemorySpec() {
+        return masterAgentMemorySpecSupplier.get();
+    }
+
+    @Override
+    public TieredStorageMemorySpec getProducerAgentMemorySpec() {
+        return producerAgentMemorySpecSupplier.get();
+    }
+
+    @Override
+    public TieredStorageMemorySpec getConsumerAgentMemorySpec() {
+        return consumerAgentMemorySpecSupplier.get();
+    }
 
     @Override
     public TierMasterAgent createMasterAgent(
@@ -69,6 +104,7 @@ public class TestingTierFactory implements TierFactory {
 
     @Override
     public TierProducerAgent createProducerAgent(
+            int numPartitions,
             int numSubpartitions,
             TieredStoragePartitionId partitionID,
             String dataFileBasePath,
@@ -78,6 +114,7 @@ public class TestingTierFactory implements TierFactory {
             TieredStorageResourceRegistry resourceRegistry,
             BatchShuffleReadBufferPool bufferPool,
             ScheduledExecutorService ioExecutor,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             int maxRequestedBuffers) {
         return tierProducerAgentSupplier.get();
     }
@@ -85,6 +122,7 @@ public class TestingTierFactory implements TierFactory {
     @Override
     public TierConsumerAgent createConsumerAgent(
             List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs,
+            List<TierShuffleDescriptor> shuffleDescriptors,
             TieredStorageNettyService nettyService) {
         return tierConsumerAgentSupplier.apply(tieredStorageConsumerSpecs, 
nettyService);
     }
@@ -92,6 +130,14 @@ public class TestingTierFactory implements TierFactory {
     /** Builder for {@link TestingTierFactory}. */
     public static class Builder {
 
+        private Consumer<Configuration> setupConsumer = conf -> {};
+
+        private Supplier<TieredStorageMemorySpec> 
masterAgentMemorySpecSupplier = () -> null;
+
+        private Supplier<TieredStorageMemorySpec> 
producerAgentMemorySpecSupplier = () -> null;
+
+        private Supplier<TieredStorageMemorySpec> 
consumerAgentMemorySpecSupplier = () -> null;
+
         private Supplier<TierMasterAgent> tierMasterAgentSupplier = () -> null;
 
         private Supplier<TierProducerAgent> tierProducerAgentSupplier = () -> 
null;
@@ -104,6 +150,29 @@ public class TestingTierFactory implements TierFactory {
 
         public Builder() {}
 
+        public Builder setSetupConsumer(Consumer<Configuration> setupConsumer) 
{
+            this.setupConsumer = setupConsumer;
+            return this;
+        }
+
+        public Builder setMasterAgentMemorySpecSupplier(
+                Supplier<TieredStorageMemorySpec> 
masterAgentMemorySpecSupplier) {
+            this.masterAgentMemorySpecSupplier = masterAgentMemorySpecSupplier;
+            return this;
+        }
+
+        public Builder setProducerAgentMemorySpecSupplier(
+                Supplier<TieredStorageMemorySpec> 
producerAgentMemorySpecSupplier) {
+            this.producerAgentMemorySpecSupplier = 
producerAgentMemorySpecSupplier;
+            return this;
+        }
+
+        public Builder setConsumerAgentMemorySpecSupplier(
+                Supplier<TieredStorageMemorySpec> 
consumerAgentMemorySpecSupplier) {
+            this.consumerAgentMemorySpecSupplier = 
consumerAgentMemorySpecSupplier;
+            return this;
+        }
+
         public Builder setTierMasterAgentSupplier(
                 Supplier<TierMasterAgent> tierMasterAgentSupplier) {
             this.tierMasterAgentSupplier = tierMasterAgentSupplier;
@@ -128,7 +197,13 @@ public class TestingTierFactory implements TierFactory {
 
         public TestingTierFactory build() {
             return new TestingTierFactory(
-                    tierMasterAgentSupplier, tierProducerAgentSupplier, 
tierConsumerAgentSupplier);
+                    setupConsumer,
+                    masterAgentMemorySpecSupplier,
+                    producerAgentMemorySpecSupplier,
+                    consumerAgentMemorySpecSupplier,
+                    tierMasterAgentSupplier,
+                    tierProducerAgentSupplier,
+                    tierConsumerAgentSupplier);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java
index acaa0c0487e..e8e9eb5b7e7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java
@@ -60,6 +60,7 @@ class RemoteTierConsumerAgentTest {
                 new RemoteTierConsumerAgent(
                         Collections.singletonList(
                                 new TieredStorageConsumerSpec(
+                                        0,
                                         partitionId,
                                         new TieredStorageInputChannelId(0),
                                         new ResultSubpartitionIndexSet(0))),
@@ -92,6 +93,7 @@ class RemoteTierConsumerAgentTest {
                 new RemoteTierConsumerAgent(
                         Collections.singletonList(
                                 new TieredStorageConsumerSpec(
+                                        0,
                                         partitionId,
                                         new TieredStorageInputChannelId(0),
                                         new ResultSubpartitionIndexSet(0))),
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java
index 289daaf8385..73b2d65cd24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java
@@ -18,10 +18,12 @@
 
 package 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId;
 import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
@@ -39,8 +41,9 @@ class RemoteTierMasterAgentTest {
 
     @Test
     void testAddAndReleasePartition() throws IOException {
+        ResultPartitionID resultPartitionID = new ResultPartitionID();
         TieredStoragePartitionId partitionId =
-                TieredStorageIdMappingUtils.convertId(new ResultPartitionID());
+                TieredStorageIdMappingUtils.convertId(resultPartitionID);
         File partitionFile = new File(getPartitionPath(partitionId, 
tempFolder.getAbsolutePath()));
         assertThat(partitionFile.createNewFile()).isTrue();
         assertThat(partitionFile.exists()).isTrue();
@@ -48,9 +51,10 @@ class RemoteTierMasterAgentTest {
         TieredStorageResourceRegistry resourceRegistry = new 
TieredStorageResourceRegistry();
         RemoteTierMasterAgent masterAgent =
                 new RemoteTierMasterAgent(tempFolder.getAbsolutePath(), 
resourceRegistry);
-        masterAgent.addPartition(partitionId);
+        TierShuffleDescriptor tierShuffleDescriptor =
+                masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), 
resultPartitionID);
         assertThat(partitionFile.exists()).isTrue();
-        masterAgent.releasePartition(partitionId);
+        masterAgent.releasePartition(tierShuffleDescriptor);
 
         assertThat(partitionFile.exists()).isFalse();
     }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java
index a3faeb1d4f4..0f465166e19 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java
@@ -74,6 +74,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.shuffle.PartitionWithMetrics;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.shuffle.ShuffleMasterContextImpl;
 import org.apache.flink.runtime.shuffle.ShuffleMetrics;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinator;
 import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider;
@@ -1042,7 +1043,8 @@ public class BatchJobRecoveryTest {
             throws Exception {
 
         final ShuffleMaster<NettyShuffleDescriptor> shuffleMaster =
-                new NettyShuffleMaster(new Configuration());
+                new NettyShuffleMaster(
+                        new ShuffleMasterContextImpl(new Configuration(), 
throwable -> {}));
         TestingJobMasterGateway jobMasterGateway =
                 new TestingJobMasterGatewayBuilder()
                         .setGetPartitionWithMetricsFunction(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
index 0d1f92b583c..d1d0795aa53 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java
@@ -152,7 +152,7 @@ class ShuffleMasterTest {
         private final boolean stopTrackingPartition;
 
         public TestShuffleMaster(Configuration conf) {
-            super(conf);
+            super(new ShuffleMasterContextImpl(conf, throwable -> {}));
             this.stopTrackingPartition = 
conf.getBoolean(STOP_TRACKING_PARTITION_KEY, false);
             currentInstance.set(this);
         }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
index 4ada32cef0a..f35b5f51b43 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java
@@ -24,7 +24,8 @@ import org.apache.flink.configuration.Configuration;
 public class ShuffleTestUtils {
 
     public static final ShuffleMaster<?> DEFAULT_SHUFFLE_MASTER =
-            new NettyShuffleMaster(new Configuration());
+            new NettyShuffleMaster(
+                    new ShuffleMasterContextImpl(new Configuration(), 
throwable -> {}));
 
     /** Private default constructor to avoid being instantiated. */
     private ShuffleTestUtils() {}

Reply via email to