This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 32b563dd29c5b08e4574773815c014c1c0cca12e Author: Yuxin Tan <tanyuxinw...@gmail.com> AuthorDate: Thu Jun 6 11:48:17 2024 +0800 [FLINK-35533][runtime] Refactor the APIs and implementations for supporting remote tier plugin --- .../io/network/NettyShuffleServiceFactory.java | 2 +- .../network/partition/ResultPartitionFactory.java | 35 ++++++++-- .../partition/consumer/SingleInputGate.java | 3 + .../partition/consumer/SingleInputGateFactory.java | 29 ++++++++ .../tiered/common/TieredStorageConfiguration.java | 48 +------------ .../hybrid/tiered/common/TieredStorageUtils.java | 15 ++++ .../shuffle/TieredInternalShuffleMaster.java | 69 +++++++++++++++++-- .../shuffle/TieredResultPartitionFactory.java | 27 ++++---- .../storage/TieredStorageConsumerClient.java | 65 ++++++++++++++++-- .../tiered/storage/TieredStorageConsumerSpec.java | 8 +++ .../tiered/storage/TieredStorageMasterClient.java | 44 ++++++++++-- .../tiered/storage/TieredStorageMemoryManager.java | 7 ++ .../storage/TieredStorageMemoryManagerImpl.java | 5 ++ .../hybrid/tiered/tier/NoOpMasterAgent.java | 24 +++++-- .../tiered/tier/NoOpTierShuffleDescriptor.java} | 14 ++-- .../hybrid/tiered/tier/TierConsumerAgent.java | 12 +++- .../partition/hybrid/tiered/tier/TierFactory.java | 15 +++- .../hybrid/tiered/tier/TierMasterAgent.java | 25 ++++--- ...oOpMasterAgent.java => TierShuffleHandler.java} | 28 ++++---- .../tiered/tier/UnknownTierShuffleDescriptor.java} | 18 ++--- .../tiered/tier/disk/DiskTierConsumerAgent.java | 6 ++ .../hybrid/tiered/tier/disk/DiskTierFactory.java | 21 ++++++ .../tiered/tier/disk/DiskTierProducerAgent.java | 3 +- .../tier/memory/MemoryTierConsumerAgent.java | 6 ++ .../tiered/tier/memory/MemoryTierFactory.java | 22 ++++++ .../tier/memory/MemoryTierProducerAgent.java | 8 ++- .../tier/remote/RemoteTierConsumerAgent.java | 6 ++ .../tiered/tier/remote/RemoteTierFactory.java | 22 ++++++ .../tiered/tier/remote/RemoteTierMasterAgent.java | 36 ++++++++-- .../tier/remote/RemoteTierProducerAgent.java | 3 +- .../flink/runtime/shuffle/NettyShuffleMaster.java | 23 ++++--- .../network/partition/ResultPartitionBuilder.java | 28 ++++++++ .../tiered/TestingTieredStorageMemoryManager.java | 18 +++++ .../tiered/netty/TestingTierConsumerAgent.java | 20 ++++++ .../netty/TieredStorageConsumerClientTest.java | 7 ++ .../tiered/shuffle/TieredResultPartitionTest.java | 10 +++ .../hybrid/tiered/storage/TestingTierFactory.java | 79 +++++++++++++++++++++- .../tier/remote/RemoteTierConsumerAgentTest.java | 2 + .../tier/remote/RemoteTierMasterAgentTest.java | 10 ++- .../adaptivebatch/BatchJobRecoveryTest.java | 4 +- .../flink/runtime/shuffle/ShuffleMasterTest.java | 2 +- .../flink/runtime/shuffle/ShuffleTestUtils.java | 3 +- 42 files changed, 681 insertions(+), 151 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java index 7a42b3487d7..f2e65d7317c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NettyShuffleServiceFactory.java @@ -68,7 +68,7 @@ public class NettyShuffleServiceFactory @Override public NettyShuffleMaster createShuffleMaster(ShuffleMasterContext shuffleMasterContext) { - return new NettyShuffleMaster(shuffleMasterContext.getConfiguration()); + return new NettyShuffleMaster(shuffleMasterContext); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java index 20f33f311ba..fa100455708 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java @@ -32,7 +32,11 @@ import org.apache.flink.runtime.io.network.partition.hybrid.HsResultPartition; import org.apache.flink.runtime.io.network.partition.hybrid.HybridShuffleConfiguration; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredResultPartitionFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.NettyShuffleUtils; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.ProcessorArchitecture; @@ -46,6 +50,9 @@ import java.io.IOException; import java.util.Optional; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.apache.flink.util.Preconditions.checkState; + /** Factory for {@link ResultPartition} to use in {@link NettyShuffleEnvironment}. */ public class ResultPartitionFactory { @@ -145,9 +152,11 @@ public class ResultPartitionFactory { partitionIndex, desc.getShuffleDescriptor().getResultPartitionID(), desc.getPartitionType(), + desc.getTotalNumberOfPartitions(), desc.getNumberOfSubpartitions(), desc.getMaxParallelism(), desc.isBroadcast(), + desc.getShuffleDescriptor(), createBufferPoolFactory(desc.getNumberOfSubpartitions(), desc.getPartitionType()), desc.isNumberOfPartitionConsumerUndefined()); } @@ -158,15 +167,22 @@ public class ResultPartitionFactory { int partitionIndex, ResultPartitionID id, ResultPartitionType type, + int numberOfPartitions, int numberOfSubpartitions, int maxParallelism, boolean isBroadcast, + ShuffleDescriptor shuffleDescriptor, SupplierWithException<BufferPool, IOException> bufferPoolFactory, boolean isNumberOfPartitionConsumerUndefined) { BufferCompressor bufferCompressor = null; if (type.supportCompression() && batchShuffleCompressionEnabled) { bufferCompressor = new BufferCompressor(networkBufferSize, compressionCodec); } + if (tieredStorage.isPresent() && type == ResultPartitionType.BLOCKING) { + LOG.warn( + "When enabling tiered storage, the BLOCKING result partition will be replaced as HYBRID_FULL."); + type = ResultPartitionType.HYBRID_FULL; + } ResultSubpartition[] subpartitions = new ResultSubpartition[numberOfSubpartitions]; @@ -241,6 +257,7 @@ public class ResultPartitionFactory { } } else if (type == ResultPartitionType.HYBRID_FULL || type == ResultPartitionType.HYBRID_SELECTIVE) { + checkState(shuffleDescriptor instanceof NettyShuffleDescriptor); if (tieredStorage.isPresent()) { partition = tieredStorage @@ -250,6 +267,7 @@ public class ResultPartitionFactory { partitionIndex, id, type, + numberOfPartitions, subpartitions.length, maxParallelism, networkBufferSize, @@ -257,6 +275,9 @@ public class ResultPartitionFactory { memoryDecouplingEnabled, partitionManager, bufferCompressor, + checkNotNull( + ((NettyShuffleDescriptor) shuffleDescriptor) + .getTierShuffleDescriptors()), bufferPoolFactory, channelManager, batchShuffleReadBufferPool, @@ -378,10 +399,7 @@ public class ResultPartitionFactory { tieredStorage.isPresent(), memoryDecouplingEnabled, tieredStorage - .map( - storage -> - storage.getTieredStorageConfiguration() - .getTotalExclusiveBufferNum()) + .map(ResultPartitionFactory::getNumTotalGuaranteedBuffers) .orElse(0), TieredStorageUtils.getMinBuffersPerResultPartition(), type); @@ -407,4 +425,13 @@ public class ResultPartitionFactory { return BoundedBlockingSubpartitionType.FILE; } } + + private static int getNumTotalGuaranteedBuffers( + TieredResultPartitionFactory resultPartitionFactory) { + return resultPartitionFactory.getTieredStorageConfiguration().getTierFactories().stream() + .map(TierFactory::getProducerAgentMemorySpec) + .map(TieredStorageMemorySpec::getNumGuaranteedBuffers) + .mapToInt(Integer::intValue) + .sum(); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java index a4346d62eb1..53c9d309cba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java @@ -312,6 +312,9 @@ public class SingleInputGate extends IndexedInputGate { BufferPool bufferPool = bufferPoolFactory.get(); setBufferPool(bufferPool); + if (tieredStorageConsumerClient != null) { + tieredStorageConsumerClient.setup(bufferPool); + } setupChannels(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java index 9ab3f367557..72b49b0a6e3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateFactory.java @@ -42,6 +42,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.UnknownTierShuffleDescriptor; import org.apache.flink.runtime.metrics.MetricNames; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; import org.apache.flink.runtime.shuffle.NettyShuffleUtils; @@ -68,6 +70,7 @@ import java.util.Optional; import static org.apache.flink.runtime.io.network.partition.consumer.InputGateSpecUtils.createGateBuffersSpec; import static org.apache.flink.runtime.shuffle.ShuffleUtils.applyWithShuffleTypeCheck; +import static org.apache.flink.util.Preconditions.checkNotNull; /** Factory for {@link SingleInputGate} to use in {@link NettyShuffleEnvironment}. */ public class SingleInputGateFactory { @@ -259,6 +262,7 @@ public class SingleInputGateFactory { int channelIdx = 0; final List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs = new ArrayList<>(); + List<List<TierShuffleDescriptor>> tierShuffleDescriptors = new ArrayList<>(); for (ShuffleDescriptor descriptor : shuffleDescriptors) { TieredStoragePartitionId partitionId = TieredStorageIdMappingUtils.convertId(descriptor.getResultPartitionID()); @@ -273,8 +277,11 @@ public class SingleInputGateFactory { channelStatistics, metrics); if (tieredStorageConfiguration != null) { + addTierShuffleDescriptors(tierShuffleDescriptors, descriptor); + tieredStorageConsumerSpecs.add( new TieredStorageConsumerSpec( + inputGate.getInputGateIndex(), partitionId, new TieredStorageInputChannelId(channelIdx), subpartitionIndexSet)); @@ -292,8 +299,11 @@ public class SingleInputGateFactory { channelStatistics, metrics); if (tieredStorageConfiguration != null) { + addTierShuffleDescriptors(tierShuffleDescriptors, descriptor); + tieredStorageConsumerSpecs.add( new TieredStorageConsumerSpec( + inputGate.getInputGateIndex(), partitionId, new TieredStorageInputChannelId(channelIdx), new ResultSubpartitionIndexSet(subpartitionIndex))); @@ -310,6 +320,7 @@ public class SingleInputGateFactory { new TieredStorageConsumerClient( tieredStorageConfiguration.getTierFactories(), tieredStorageConsumerSpecs, + tierShuffleDescriptors, tieredStorageNettyService); inputGate.setTieredStorageService( tieredStorageConsumerSpecs, @@ -439,6 +450,24 @@ public class SingleInputGateFactory { return inputChannelDescriptor.isLocalTo(taskExecutorResourceId); } + private void addTierShuffleDescriptors( + List<List<TierShuffleDescriptor>> tierShuffleDescriptors, + ShuffleDescriptor descriptor) { + if (descriptor instanceof NettyShuffleDescriptor) { + tierShuffleDescriptors.add( + ((NettyShuffleDescriptor) descriptor).getTierShuffleDescriptors()); + } else if (descriptor.isUnknown()) { + List<TierShuffleDescriptor> unknownDescriptors = new ArrayList<>(); + int numTiers = checkNotNull(tieredStorageConfiguration).getTierFactories().size(); + for (int i = 0; i < numTiers; i++) { + unknownDescriptors.add(UnknownTierShuffleDescriptor.INSTANCE); + } + tierShuffleDescriptors.add(unknownDescriptors); + } else { + throw new IllegalArgumentException("Unsupported shuffle descriptor type " + descriptor); + } + } + @VisibleForTesting static SupplierWithException<BufferPool, IOException> createBufferPoolFactory( BufferPoolFactory bufferPoolFactory, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java index cbbe1582177..cb475ae31ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageConfiguration.java @@ -19,70 +19,26 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.common; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TierFactoryInitializer; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; -import java.util.ArrayList; import java.util.List; /** Configurations for the Tiered Storage. */ public class TieredStorageConfiguration { - private static final int DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS = 100; - - private static final int DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS = 1; - - private static final int DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS = 1; - private final List<TierFactory> tierFactories; - private final List<Integer> tierExclusiveBuffers; - private TieredStorageConfiguration(List<TierFactory> tierFactories) { this.tierFactories = tierFactories; - this.tierExclusiveBuffers = new ArrayList<>(); - } - - /** - * Get the total exclusive buffer number. - * - * @return the total exclusive buffer number. - */ - public int getTotalExclusiveBufferNum() { - return tierExclusiveBuffers.stream().mapToInt(Integer::intValue).sum(); - } - - /** - * Get exclusive buffer number of each tier. - * - * @return buffer number of each tier. - */ - public List<Integer> getEachTierExclusiveBufferNum() { - return tierExclusiveBuffers; } public List<TierFactory> getTierFactories() { return tierFactories; } - private void setupTierFactoriesAndExclusiveBuffers(Configuration configuration) { - String remoteStorageBasePath = - configuration.get( - NettyShuffleEnvironmentOptions - .NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH); - tierExclusiveBuffers.add(DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS); - tierExclusiveBuffers.add(DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS); - if (remoteStorageBasePath != null) { - tierExclusiveBuffers.add(DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS); - } - } - public static TieredStorageConfiguration fromConfiguration(Configuration configuration) { - TieredStorageConfiguration tieredStorageConfiguration = - new TieredStorageConfiguration( - TierFactoryInitializer.initializeTierFactories(configuration)); - tieredStorageConfiguration.setupTierFactoriesAndExclusiveBuffers(configuration); - return tieredStorageConfiguration; + return new TieredStorageConfiguration( + TierFactoryInitializer.initializeTierFactories(configuration)); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java index 9391a1be6a8..8565daf14d2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/common/TieredStorageUtils.java @@ -25,6 +25,9 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Buffe import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.HashBufferAccumulator; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.SortBufferAccumulator; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.disk.DiskTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.RemoteTierFactory; import java.nio.ByteBuffer; import java.util.List; @@ -94,6 +97,18 @@ public class TieredStorageUtils { return DEFAULT_MIN_BUFFERS_PER_RESULT_PARTITION; } + public static String getMemoryTierName() { + return MemoryTierFactory.class.getSimpleName(); + } + + public static String getDiskTierName() { + return DiskTierFactory.class.getSimpleName(); + } + + public static String getRemoteTierName() { + return RemoteTierFactory.class.getSimpleName(); + } + public static ByteBuffer[] generateBufferWithHeaders( List<Tuple2<Buffer, Integer>> bufferWithIndexes) { ByteBuffer[] bufferWithHeaders = new ByteBuffer[2 * bufferWithIndexes.size()]; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java index 37b13b95f30..01cc5468692 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredInternalShuffleMaster.java @@ -18,18 +18,26 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageConfiguration; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMasterClient; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler; +import org.apache.flink.runtime.shuffle.JobShuffleContext; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMasterContext; +import java.util.Collection; import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId; - /** * A wrapper internal shuffle master class for tiered storage. All the tiered storage operations * with the shuffle master should be wrapped in this class. @@ -38,7 +46,11 @@ public class TieredInternalShuffleMaster { private final TieredStorageMasterClient tieredStorageMasterClient; - public TieredInternalShuffleMaster(Configuration conf) { + private final ShuffleMasterContext shuffleMasterContext; + + public TieredInternalShuffleMaster(ShuffleMasterContext shuffleMasterContext) { + this.shuffleMasterContext = shuffleMasterContext; + Configuration conf = shuffleMasterContext.getConfiguration(); TieredStorageConfiguration tieredStorageConfiguration = TieredStorageConfiguration.fromConfiguration(conf); TieredStorageResourceRegistry resourceRegistry = new TieredStorageResourceRegistry(); @@ -49,11 +61,54 @@ public class TieredInternalShuffleMaster { this.tieredStorageMasterClient = new TieredStorageMasterClient(tierFactories); } - public void addPartition(ResultPartitionID resultPartitionID) { - tieredStorageMasterClient.addPartition(convertId(resultPartitionID)); + /** + * Registers the target job together with the corresponding {@link JobShuffleContext} to this + * shuffle master. + */ + public void registerJob(JobShuffleContext context) { + tieredStorageMasterClient.registerJob(context.getJobId(), getTierShuffleHandler(context)); + } + + /** + * Unregisters the target job from this shuffle master, which means the corresponding job has + * reached a global termination state and all the allocated resources except for the cluster + * partitions can be cleared. + * + * @param jobID ID of the target job to be unregistered. + */ + public void unregisterJob(JobID jobID) { + tieredStorageMasterClient.unregisterJob(jobID); + } + + public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor( + JobID jobID, ResultPartitionID resultPartitionID) { + return tieredStorageMasterClient.addPartitionAndGetShuffleDescriptor( + jobID, resultPartitionID); } - public void releasePartition(ResultPartitionID resultPartitionID) { - tieredStorageMasterClient.releasePartition(convertId(resultPartitionID)); + public void releasePartition(ShuffleDescriptor shuffleDescriptor) { + tieredStorageMasterClient.releasePartition(shuffleDescriptor); + } + + public void close() { + tieredStorageMasterClient.close(); + } + + private TierShuffleHandler getTierShuffleHandler(JobShuffleContext context) { + return new TierShuffleHandler() { + @Override + public CompletableFuture<?> onReleasePartitions( + Collection<TieredStoragePartitionId> partitionIds) { + return context.stopTrackingAndReleasePartitions( + partitionIds.stream() + .map(TieredStorageIdMappingUtils::convertId) + .collect(Collectors.toList())); + } + + @Override + public void onFatalError(Throwable throwable) { + shuffleMasterContext.onFatalError(throwable); + } + }; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java index 7f0ebf125b9..8a788fe9258 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionFactory.java @@ -41,6 +41,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Tiere import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.memory.MemoryTierFactory; import org.apache.flink.util.function.SupplierWithException; @@ -48,9 +49,12 @@ import javax.annotation.Nullable; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.util.Preconditions.checkState; + /** {@link TieredResultPartitionFactory} contains the components to set up tiered storage. */ public class TieredResultPartitionFactory { @@ -84,6 +88,7 @@ public class TieredResultPartitionFactory { int partitionIndex, ResultPartitionID partitionId, ResultPartitionType partitionType, + int numPartitions, int numSubpartitions, int maxParallelism, int bufferSizeBytes, @@ -91,6 +96,7 @@ public class TieredResultPartitionFactory { boolean memoryDecouplingEnabled, ResultPartitionManager partitionManager, @Nullable BufferCompressor bufferCompressor, + List<TierShuffleDescriptor> tierShuffleDescriptors, SupplierWithException<BufferPool, IOException> bufferPoolFactory, FileChannelManager fileChannelManager, BatchShuffleReadBufferPool batchShuffleReadBufferPool, @@ -117,12 +123,14 @@ public class TieredResultPartitionFactory { Tuple2<List<TierProducerAgent>, List<TieredStorageMemorySpec>> producerAgentsAndMemorySpecs = createTierProducerAgentsAndMemorySpecs( + numPartitions, numSubpartitions, isBroadCastOnly, TieredStorageIdMappingUtils.convertId(partitionId), memoryManager, bufferAccumulator, partitionType == ResultPartitionType.HYBRID_SELECTIVE, + tierShuffleDescriptors, fileChannelManager, batchShuffleReadBufferPool, batchShuffleReadIOExecutor); @@ -186,12 +194,14 @@ public class TieredResultPartitionFactory { private Tuple2<List<TierProducerAgent>, List<TieredStorageMemorySpec>> createTierProducerAgentsAndMemorySpecs( + int numberOfPartitions, int numberOfSubpartitions, boolean isBroadcastOnly, TieredStoragePartitionId partitionID, TieredStorageMemoryManager memoryManager, BufferAccumulator bufferAccumulator, boolean isHybridSelective, + List<TierShuffleDescriptor> tierShuffleDescriptors, FileChannelManager fileChannelManager, BatchShuffleReadBufferPool batchShuffleReadBufferPool, ScheduledExecutorService batchShuffleReadIOExecutor) { @@ -209,10 +219,10 @@ public class TieredResultPartitionFactory { numberOfSubpartitions + 1, TieredStorageUtils.getAccumulatorExclusiveBuffers()), true)); - List<Integer> tierExclusiveBuffers = - tieredStorageConfiguration.getEachTierExclusiveBufferNum(); List<TierFactory> tierFactories = tieredStorageConfiguration.getTierFactories(); + checkState(tierFactories.size() == tierShuffleDescriptors.size()); + for (int index = 0; index < tierFactories.size(); ++index) { TierFactory tierFactory = tierFactories.get(index); if (!isHybridSelective && tierFactory.getClass() == MemoryTierFactory.class) { @@ -220,6 +230,7 @@ public class TieredResultPartitionFactory { } TierProducerAgent producerAgent = tierFactory.createProducerAgent( + numberOfPartitions, numberOfSubpartitions, partitionID, fileChannelManager.createChannel().getPath(), @@ -229,20 +240,12 @@ public class TieredResultPartitionFactory { tieredStorageResourceRegistry, batchShuffleReadBufferPool, batchShuffleReadIOExecutor, + Collections.singletonList(tierShuffleDescriptors.get(index)), Math.max( 2 * batchShuffleReadBufferPool.getNumBuffersPerRequest(), numberOfSubpartitions)); tierProducerAgents.add(producerAgent); - - if (tierFactory.getClass() == MemoryTierFactory.class) { - tieredStorageMemorySpecs.add( - new TieredStorageMemorySpec( - producerAgent, tierExclusiveBuffers.get(index), false)); - } else { - tieredStorageMemorySpecs.add( - new TieredStorageMemorySpec( - producerAgent, tierExclusiveBuffers.get(index), true)); - } + tieredStorageMemorySpecs.add(tierFactory.getProducerAgentMemorySpec()); } return Tuple2.of(tierProducerAgents, tieredStorageMemorySpecs); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java index 4df026347a2..6d9cfff4ebc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerClient.java @@ -20,20 +20,25 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.io.network.buffer.Buffer; +import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; +import static org.apache.flink.util.Preconditions.checkState; + /** {@link TieredStorageConsumerClient} is used to read buffer from tiered store. */ public class TieredStorageConsumerClient { @@ -56,10 +61,18 @@ public class TieredStorageConsumerClient { public TieredStorageConsumerClient( List<TierFactory> tierFactories, List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<List<TierShuffleDescriptor>> tierShuffleDescriptors, TieredStorageNettyService nettyService) { this.tierFactories = tierFactories; this.nettyService = nettyService; - this.tierConsumerAgents = createTierConsumerAgents(tieredStorageConsumerSpecs); + this.tierConsumerAgents = + createTierConsumerAgents(tieredStorageConsumerSpecs, tierShuffleDescriptors); + } + + public void setup(BufferPool bufferPool) { + TieredStorageMemoryManagerImpl memoryManager = new TieredStorageMemoryManagerImpl(0, false); + memoryManager.setup(bufferPool, Collections.emptyList()); + tierConsumerAgents.forEach(tierConsumerAgent -> tierConsumerAgent.setup(memoryManager)); } public void start() { @@ -90,7 +103,8 @@ public class TieredStorageConsumerClient { } public Optional<Buffer> getNextBuffer( - TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) { + TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId) + throws IOException { Tuple2<TierConsumerAgent, Integer> currentConsumerAgentAndSegmentId = currentConsumerAgentAndSegmentIds .computeIfAbsent(partitionId, ignore -> new HashMap<>()) @@ -148,12 +162,53 @@ public class TieredStorageConsumerClient { // -------------------------------------------------------------------------------------------- private List<TierConsumerAgent> createTierConsumerAgents( - List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs) { + List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<List<TierShuffleDescriptor>> shuffleDescriptors) { ArrayList<TierConsumerAgent> tierConsumerAgents = new ArrayList<>(); - for (TierFactory tierFactory : tierFactories) { + + List<List<TierShuffleDescriptor>> transformedTierShuffleDescriptors = + transformTierShuffleDescriptors(shuffleDescriptors); + // Each tier only requires one inner list of transformedTierShuffleDescriptors, so the size + // of transformedTierShuffleDescriptors and the size of tierFactories are the same. + checkState(transformedTierShuffleDescriptors.size() == tierFactories.size()); + for (int i = 0; i < tierFactories.size(); i++) { tierConsumerAgents.add( - tierFactory.createConsumerAgent(tieredStorageConsumerSpecs, nettyService)); + tierFactories + .get(i) + .createConsumerAgent( + tieredStorageConsumerSpecs, + transformedTierShuffleDescriptors.get(i), + nettyService)); } return tierConsumerAgents; } + + /** + * Before transforming the shuffle descriptors, the number of tier shuffle descriptors is + * numPartitions * numTiers (That means shuffleDescriptors.size() is numPartitions, while the + * shuffleDescriptors.get(0).size() is numTiers). After transforming, the number of tier shuffle + * descriptors is numTiers * numPartitions (That means transformedList.size() is numTiers, while + * transformedList.get(0).size() is numPartitions). + */ + private static List<List<TierShuffleDescriptor>> transformTierShuffleDescriptors( + List<List<TierShuffleDescriptor>> shuffleDescriptors) { + int numTiers = 0; + int numPartitions = shuffleDescriptors.size(); + for (List<TierShuffleDescriptor> tierShuffleDescriptors : shuffleDescriptors) { + if (numTiers == 0) { + numTiers = tierShuffleDescriptors.size(); + } + checkState(numTiers == tierShuffleDescriptors.size()); + } + + List<List<TierShuffleDescriptor>> transformedList = new ArrayList<>(); + for (int i = 0; i < numTiers; i++) { + List<TierShuffleDescriptor> innerList = new ArrayList<>(); + for (int j = 0; j < numPartitions; j++) { + innerList.add(shuffleDescriptors.get(j).get(i)); + } + transformedList.add(innerList); + } + return transformedList; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java index 42b143caaf1..91aaa8bc0c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageConsumerSpec.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered /** Describe the different data sources in {@link TieredStorageConsumerClient}. */ public class TieredStorageConsumerSpec { + private final int gateIndex; + private final TieredStoragePartitionId tieredStoragePartitionId; private final TieredStorageInputChannelId tieredStorageInputChannelId; @@ -32,14 +34,20 @@ public class TieredStorageConsumerSpec { private final ResultSubpartitionIndexSet tieredStorageSubpartitionIds; public TieredStorageConsumerSpec( + int gateIndex, TieredStoragePartitionId tieredStoragePartitionId, TieredStorageInputChannelId tieredStorageInputChannelId, ResultSubpartitionIndexSet tieredStorageSubpartitionIds) { + this.gateIndex = gateIndex; this.tieredStoragePartitionId = tieredStoragePartitionId; this.tieredStorageInputChannelId = tieredStorageInputChannelId; this.tieredStorageSubpartitionIds = tieredStorageSubpartitionIds; } + public int getGateIndex() { + return gateIndex; + } + public TieredStoragePartitionId getPartitionId() { return tieredStoragePartitionId; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java index 60f2c779f30..457dcffef25 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMasterClient.java @@ -18,10 +18,18 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage; -import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler; +import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import java.util.List; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkState; /** Client of the Tiered Storage used by the master. */ public class TieredStorageMasterClient { @@ -32,11 +40,37 @@ public class TieredStorageMasterClient { this.tiers = tiers; } - public void addPartition(TieredStoragePartitionId partitionId) { - tiers.forEach(tierMasterAgent -> tierMasterAgent.addPartition(partitionId)); + public void registerJob(JobID jobID, TierShuffleHandler shuffleHandler) { + tiers.forEach(tierMasterAgent -> tierMasterAgent.registerJob(jobID, shuffleHandler)); + } + + public void unregisterJob(JobID jobID) { + tiers.forEach(tierMasterAgent -> tierMasterAgent.unregisterJob(jobID)); + } + + public List<TierShuffleDescriptor> addPartitionAndGetShuffleDescriptor( + JobID jobID, ResultPartitionID resultPartitionID) { + return tiers.stream() + .map( + tierMasterAgent -> + tierMasterAgent.addPartitionAndGetShuffleDescriptor( + jobID, resultPartitionID)) + .collect(Collectors.toList()); + } + + public void releasePartition(ShuffleDescriptor shuffleDescriptor) { + checkState(shuffleDescriptor instanceof NettyShuffleDescriptor); + List<TierShuffleDescriptor> tierShuffleDescriptors = + ((NettyShuffleDescriptor) shuffleDescriptor).getTierShuffleDescriptors(); + if (tierShuffleDescriptors != null && !tierShuffleDescriptors.isEmpty()) { + checkState(tierShuffleDescriptors.size() == tiers.size()); + for (int i = 0; i < tierShuffleDescriptors.size(); i++) { + tiers.get(i).releasePartition(tierShuffleDescriptors.get(i)); + } + } } - public void releasePartition(TieredStoragePartitionId partitionId) { - tiers.forEach(tierMasterAgent -> tierMasterAgent.releasePartition(partitionId)); + public void close() { + tiers.forEach(TierMasterAgent::close); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java index 4ac9c400b27..c2793b72bbb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManager.java @@ -78,6 +78,13 @@ public interface TieredStorageMemoryManager { */ void listenBufferReclaimRequest(Runnable onBufferReclaimRequest); + /** + * Expose and get the internal {@link BufferPool}. Please note that this method is a temporary + * workaround for the remote tier plugin and may be removed at any time in the future. We + * strongly advise that users do not rely on this method. + */ + BufferPool getBufferPool(); + /** * Request a {@link BufferBuilder} instance for a specific owner. The {@link * TieredStorageMemoryManagerImpl} will not check whether a buffer can be requested. The manager diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java index c7365bc9c41..0ef453ec126 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TieredStorageMemoryManagerImpl.java @@ -198,6 +198,11 @@ public class TieredStorageMemoryManagerImpl implements TieredStorageMemoryManage bufferReclaimRequestListeners.add(onBufferReclaimRequest); } + @Override + public BufferPool getBufferPool() { + return bufferPool; + } + @Override public BufferBuilder requestBufferBlocking(Object owner) { checkIsInitialized(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java index bf259dee224..bd833d69e45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java @@ -18,22 +18,38 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; -import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; /** The empty implementation for {@link TierMasterAgent}. */ public class NoOpMasterAgent implements TierMasterAgent { public static final NoOpMasterAgent INSTANCE = new NoOpMasterAgent(); - private NoOpMasterAgent() {} + @Override + public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) { + // noop + } + + @Override + public void unregisterJob(JobID jobID) { + // noop + } + + @Override + public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( + JobID jobID, ResultPartitionID resultPartitionID) { + // noop + return NoOpTierShuffleDescriptor.INSTANCE; + } @Override - public void addPartition(TieredStoragePartitionId partitionId) { + public void releasePartition(TierShuffleDescriptor shuffleDescriptor) { // noop } @Override - public void releasePartition(TieredStoragePartitionId partitionId) { + public void close() { // noop } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpTierShuffleDescriptor.java similarity index 66% copy from flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpTierShuffleDescriptor.java index 4ada32cef0a..1c1cb67f78e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpTierShuffleDescriptor.java @@ -16,16 +16,14 @@ * limitations under the License. */ -package org.apache.flink.runtime.shuffle; +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; -import org.apache.flink.configuration.Configuration; +/** The empty implementation for {@link TierShuffleDescriptor}. */ +public class NoOpTierShuffleDescriptor implements TierShuffleDescriptor { -/** Utils for shuffle related tests. */ -public class ShuffleTestUtils { + private static final long serialVersionUID = 1L; - public static final ShuffleMaster<?> DEFAULT_SHUFFLE_MASTER = - new NettyShuffleMaster(new Configuration()); + public static final NoOpTierShuffleDescriptor INSTANCE = new NoOpTierShuffleDescriptor(); - /** Private default constructor to avoid being instantiated. */ - private ShuffleTestUtils() {} + private NoOpTierShuffleDescriptor() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java index fecf9baa9e1..9a845c7ddfb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierConsumerAgent.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; import java.io.IOException; import java.util.Optional; @@ -33,6 +34,14 @@ import java.util.Optional; */ public interface TierConsumerAgent { + /** + * The consumer agent may request buffers from the memory manager. Therefore, the {@link + * TieredStorageMemoryManager} should be integrated into the tier consumer agent. Since the + * buffer pool is initialized after the creation of the client, the memory manager need to be + * assigned after the buffer pool becomes available. + */ + void setup(TieredStorageMemoryManager memoryManager); + /** Start the consumer agent. */ void start(); @@ -59,7 +68,8 @@ public interface TierConsumerAgent { Optional<Buffer> getNextBuffer( TieredStoragePartitionId partitionId, TieredStorageSubpartitionId subpartitionId, - int segmentId); + int segmentId) + throws IOException; /** * Register the notifier to notify the availability of a subpartition. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java index 9f1ad6e33c7..fbfce81f2ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import java.util.List; @@ -35,11 +36,21 @@ public interface TierFactory { /** Sets up the tier factory based on the {@link Configuration}. */ void setup(Configuration configuration); + /** Get the {@link TieredStorageMemorySpec} of the master-side agent. */ + TieredStorageMemorySpec getMasterAgentMemorySpec(); + + /** Get the {@link TieredStorageMemorySpec} of the producer-side agent. */ + TieredStorageMemorySpec getProducerAgentMemorySpec(); + + /** Get the {@link TieredStorageMemorySpec} of the consumer-side agent. */ + TieredStorageMemorySpec getConsumerAgentMemorySpec(); + /** Creates the master-side agent of a Tier. */ TierMasterAgent createMasterAgent(TieredStorageResourceRegistry tieredStorageResourceRegistry); /** Creates the producer-side agent of a Tier. */ TierProducerAgent createProducerAgent( + int numPartitions, int numSubpartitions, TieredStoragePartitionId partitionID, String dataFileBasePath, @@ -49,10 +60,12 @@ public interface TierFactory { TieredStorageResourceRegistry resourceRegistry, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, - int maxRequestedBuffers); + List<TierShuffleDescriptor> shuffleDescriptors, + int maxRequestedBuffer); /** Creates the consumer-side agent of a Tier. */ TierConsumerAgent createConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<TierShuffleDescriptor> shuffleDescriptors, TieredStorageNettyService nettyService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java index 2c34cdd38c6..e0d20d24082 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierMasterAgent.java @@ -18,22 +18,29 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; -import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; /** The master-side agent of a Tier. */ public interface TierMasterAgent { - /** - * Add a new tiered storage partition. - * - * @param partitionId the identifier of the new partition - */ - void addPartition(TieredStoragePartitionId partitionId); + /** Register a job id with a {@link TierShuffleHandler}. */ + void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler); + + /** Unregister a job id. */ + void unregisterJob(JobID jobID); + + /** Add a new tiered storage partition and get the {@link TierShuffleDescriptor}. */ + TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( + JobID jobID, ResultPartitionID resultPartitionID); /** * Release a tiered storage partition. * - * @param partitionId the identifier of partition to be released + * @param shuffleDescriptor the partition shuffle descriptor to be released */ - void releasePartition(TieredStoragePartitionId partitionId); + void releasePartition(TierShuffleDescriptor shuffleDescriptor); + + /** Close this tier master agent. */ + void close(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierShuffleHandler.java similarity index 55% copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierShuffleHandler.java index bf259dee224..93d283959c1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/NoOpMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/TierShuffleHandler.java @@ -20,20 +20,22 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; -/** The empty implementation for {@link TierMasterAgent}. */ -public class NoOpMasterAgent implements TierMasterAgent { +import java.util.Collection; +import java.util.concurrent.CompletableFuture; - public static final NoOpMasterAgent INSTANCE = new NoOpMasterAgent(); - - private NoOpMasterAgent() {} +/** + * A handler to process the call back result from each tier, the callbacks can be some events, some + * errors, some exceptions, etc. If you want to process some new callbacks, you can add more methods + * to this handler interface. + * + * <p>When the tier happens some events, the tier will call these methods, then the framework can + * process the events. + */ +public interface TierShuffleHandler { - @Override - public void addPartition(TieredStoragePartitionId partitionId) { - // noop - } + /** A callback to process the event of releasing a collection of tiered result partitions. */ + CompletableFuture<?> onReleasePartitions(Collection<TieredStoragePartitionId> partitionIds); - @Override - public void releasePartition(TieredStoragePartitionId partitionId) { - // noop - } + /** A callback to process the fatal error (if the error exists). */ + void onFatalError(Throwable throwable); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/UnknownTierShuffleDescriptor.java similarity index 62% copy from flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/UnknownTierShuffleDescriptor.java index 4ada32cef0a..b09c71855d5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/UnknownTierShuffleDescriptor.java @@ -16,16 +16,16 @@ * limitations under the License. */ -package org.apache.flink.runtime.shuffle; +package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier; -import org.apache.flink.configuration.Configuration; - -/** Utils for shuffle related tests. */ -public class ShuffleTestUtils { +/** + * This is a placeholder when the tier shuffle descriptor should be unknown because the netty + * shuffle descriptor is unknown. + */ +public class UnknownTierShuffleDescriptor implements TierShuffleDescriptor { + private static final long serialVersionUID = 1L; - public static final ShuffleMaster<?> DEFAULT_SHUFFLE_MASTER = - new NettyShuffleMaster(new Configuration()); + public static final UnknownTierShuffleDescriptor INSTANCE = new UnknownTierShuffleDescriptor(); - /** Private default constructor to avoid being instantiated. */ - private ShuffleTestUtils() {} + private UnknownTierShuffleDescriptor() {} } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java index b48ec280aa9..b97323fe457 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierConsumerAgent.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyCo import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import java.io.IOException; @@ -61,6 +62,11 @@ public class DiskTierConsumerAgent implements TierConsumerAgent { } } + @Override + public void setup(TieredStorageMemoryManager memoryManager) { + // noop + } + @Override public void start() { // noop diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java index 9d59a8ca908..786aab43c5f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierFactory.java @@ -28,12 +28,14 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.Producer import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.NoOpMasterAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.util.ConfigurationParserUtils; import java.nio.file.Path; @@ -42,6 +44,7 @@ import java.time.Duration; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.DATA_FILE_SUFFIX; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.ProducerMergedPartitionFile.INDEX_FILE_SUFFIX; import static org.apache.flink.util.Preconditions.checkState; @@ -70,6 +73,21 @@ public class DiskTierFactory implements TierFactory { this.bufferSizeBytes = ConfigurationParserUtils.getPageSize(configuration); } + @Override + public TieredStorageMemorySpec getMasterAgentMemorySpec() { + return new TieredStorageMemorySpec(getDiskTierName(), 0); + } + + @Override + public TieredStorageMemorySpec getProducerAgentMemorySpec() { + return new TieredStorageMemorySpec(getDiskTierName(), DEFAULT_DISK_TIER_EXCLUSIVE_BUFFERS); + } + + @Override + public TieredStorageMemorySpec getConsumerAgentMemorySpec() { + return new TieredStorageMemorySpec(getDiskTierName(), 0); + } + @Override public TierMasterAgent createMasterAgent(TieredStorageResourceRegistry resourceRegistry) { return NoOpMasterAgent.INSTANCE; @@ -77,6 +95,7 @@ public class DiskTierFactory implements TierFactory { @Override public TierProducerAgent createProducerAgent( + int numPartitions, int numSubpartitions, TieredStoragePartitionId partitionId, String dataFileBasePath, @@ -86,6 +105,7 @@ public class DiskTierFactory implements TierFactory { TieredStorageResourceRegistry resourceRegistry, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, + List<TierShuffleDescriptor> shuffleDescriptors, int maxRequestedBuffers) { checkState(bufferSizeBytes > 0); @@ -125,6 +145,7 @@ public class DiskTierFactory implements TierFactory { @Override public TierConsumerAgent createConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<TierShuffleDescriptor> shuffleDescriptors, TieredStorageNettyService nettyService) { return new DiskTierConsumerAgent(tieredStorageConsumerSpecs, nettyService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java index 4f88ca71bfa..e8d35ea6e39 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/disk/DiskTierProducerAgent.java @@ -48,6 +48,7 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getDiskTierName; import static org.apache.flink.util.Preconditions.checkArgument; /** The disk tier implementation of {@link TierProducerAgent}. */ @@ -177,7 +178,7 @@ public class DiskTierProducerAgent implements TierProducerAgent, NettyServicePro return false; } if (finishedBuffer.isBuffer()) { - memoryManager.transferBufferOwnership(bufferOwner, this, finishedBuffer); + memoryManager.transferBufferOwnership(bufferOwner, getDiskTierName(), finishedBuffer); } currentSubpartitionWriteBuffers[subpartitionIndex]++; emitBuffer(finishedBuffer, subpartitionIndex, numRemainingConsecutiveBuffers == 0); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java index 261742d6228..138ddaa4326 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierConsumerAgent.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.NettyCo import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import java.io.IOException; @@ -61,6 +62,11 @@ public class MemoryTierConsumerAgent implements TierConsumerAgent { } } + @Override + public void setup(TieredStorageMemoryManager memoryManager) { + // noop + } + @Override public void start() { // noop diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java index dfd762582cb..b2d84acb364 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierFactory.java @@ -24,17 +24,20 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.NoOpMasterAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.util.ConfigurationParserUtils; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName; import static org.apache.flink.util.Preconditions.checkState; /** The implementation of {@link TierFactory} for memory tier. */ @@ -53,6 +56,22 @@ public class MemoryTierFactory implements TierFactory { this.bufferSizeBytes = ConfigurationParserUtils.getPageSize(configuration); } + @Override + public TieredStorageMemorySpec getMasterAgentMemorySpec() { + return new TieredStorageMemorySpec(getMemoryTierName(), 0); + } + + @Override + public TieredStorageMemorySpec getProducerAgentMemorySpec() { + return new TieredStorageMemorySpec( + getMemoryTierName(), DEFAULT_MEMORY_TIER_EXCLUSIVE_BUFFERS); + } + + @Override + public TieredStorageMemorySpec getConsumerAgentMemorySpec() { + return new TieredStorageMemorySpec(getMemoryTierName(), 0); + } + @Override public TierMasterAgent createMasterAgent( TieredStorageResourceRegistry tieredStorageResourceRegistry) { @@ -61,6 +80,7 @@ public class MemoryTierFactory implements TierFactory { @Override public TierProducerAgent createProducerAgent( + int numPartitions, int numSubpartitions, TieredStoragePartitionId partitionID, String dataFileBasePath, @@ -70,6 +90,7 @@ public class MemoryTierFactory implements TierFactory { TieredStorageResourceRegistry resourceRegistry, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, + List<TierShuffleDescriptor> shuffleDescriptors, int maxRequestedBuffers) { checkState(bufferSizeBytes > 0); @@ -88,6 +109,7 @@ public class MemoryTierFactory implements TierFactory { @Override public TierConsumerAgent createConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<TierShuffleDescriptor> shuffleDescriptors, TieredStorageNettyService nettyService) { return new MemoryTierConsumerAgent(tieredStorageConsumerSpecs, nettyService); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java index 1dc592c021b..973b8cdf741 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/memory/MemoryTierProducerAgent.java @@ -40,6 +40,7 @@ import java.io.IOException; import java.util.Arrays; import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.END_OF_SEGMENT; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getMemoryTierName; import static org.apache.flink.util.Preconditions.checkArgument; /** The memory tier implementation of {@link TierProducerAgent}. */ @@ -108,8 +109,9 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP && subpartitionProducerAgents[subpartitionId.getSubpartitionId()] .numQueuedBuffers() < subpartitionMaxQueuedBuffers - && (memoryManager.getMaxNonReclaimableBuffers(this) - - memoryManager.numOwnerRequestedBuffer(this)) + && (memoryManager.getMaxNonReclaimableBuffers(getMemoryTierName()) + - memoryManager.numOwnerRequestedBuffer( + getMemoryTierName())) > Math.max(numBuffersPerSegment, minNumBuffers) && memoryManager.ensureCapacity( Math.max(numBuffersPerSegment, minNumBuffers)); @@ -137,7 +139,7 @@ public class MemoryTierProducerAgent implements TierProducerAgent, NettyServiceP return false; } if (finishedBuffer.isBuffer()) { - memoryManager.transferBufferOwnership(bufferOwner, this, finishedBuffer); + memoryManager.transferBufferOwnership(bufferOwner, getMemoryTierName(), finishedBuffer); } currentSubpartitionWriteBuffers[subpartitionIndex]++; addFinishedBuffer(finishedBuffer, subpartitionIndex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java index 07b19cf3b4c..07c3d29eae2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgent.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.PartitionFileReader; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -100,6 +101,11 @@ public class RemoteTierConsumerAgent implements TierConsumerAgent { } } + @Override + public void setup(TieredStorageMemoryManager memoryManager) { + // noop + } + @Override public void start() { remoteStorageScanner.start(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java index 5ab7e89e5b3..5c0f72ecea5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierFactory.java @@ -28,16 +28,19 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentP import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyService; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemorySpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.util.ConfigurationParserUtils; import java.util.List; import java.util.concurrent.ScheduledExecutorService; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -64,6 +67,22 @@ public class RemoteTierFactory implements TierFactory { .NETWORK_HYBRID_SHUFFLE_REMOTE_STORAGE_BASE_PATH)); } + @Override + public TieredStorageMemorySpec getMasterAgentMemorySpec() { + return new TieredStorageMemorySpec(getRemoteTierName(), 0); + } + + @Override + public TieredStorageMemorySpec getProducerAgentMemorySpec() { + return new TieredStorageMemorySpec( + getRemoteTierName(), DEFAULT_REMOTE_TIER_EXCLUSIVE_BUFFERS); + } + + @Override + public TieredStorageMemorySpec getConsumerAgentMemorySpec() { + return new TieredStorageMemorySpec(getRemoteTierName(), 0); + } + @Override public TierMasterAgent createMasterAgent(TieredStorageResourceRegistry resourceRegistry) { return new RemoteTierMasterAgent(remoteStoragePath, resourceRegistry); @@ -71,6 +90,7 @@ public class RemoteTierFactory implements TierFactory { @Override public TierProducerAgent createProducerAgent( + int numPartitions, int numSubpartitions, TieredStoragePartitionId partitionID, String dataFileBasePath, @@ -80,6 +100,7 @@ public class RemoteTierFactory implements TierFactory { TieredStorageResourceRegistry resourceRegistry, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, + List<TierShuffleDescriptor> shuffleDescriptors, int maxRequestedBuffers) { checkState(bufferSizeBytes > 0); checkNotNull(remoteStoragePath); @@ -100,6 +121,7 @@ public class RemoteTierFactory implements TierFactory { @Override public TierConsumerAgent createConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<TierShuffleDescriptor> shuffleDescriptors, TieredStorageNettyService nettyService) { PartitionFileReader partitionFileReader = SegmentPartitionFile.createPartitionFileReader(remoteStoragePath); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java index a41fc2695c1..924993b875a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgent.java @@ -18,12 +18,18 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleHandler; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils.convertId; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.deletePathQuietly; import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.file.SegmentPartitionFile.getPartitionPath; +import static org.apache.flink.util.Preconditions.checkState; /** The implementation of {@link TierMasterAgent} for the remote tier. */ public class RemoteTierMasterAgent implements TierMasterAgent { @@ -39,14 +45,34 @@ public class RemoteTierMasterAgent implements TierMasterAgent { } @Override - public void addPartition(TieredStoragePartitionId partitionID) { + public void registerJob(JobID jobID, TierShuffleHandler tierShuffleHandler) { + // noop + } + + @Override + public void unregisterJob(JobID jobID) { + // noop + } + + @Override + public TierShuffleDescriptor addPartitionAndGetShuffleDescriptor( + JobID jobID, ResultPartitionID resultPartitionID) { + TieredStoragePartitionId partitionId = convertId(resultPartitionID); resourceRegistry.registerResource( - partitionID, - () -> deletePathQuietly(getPartitionPath(partitionID, remoteStorageBasePath))); + partitionId, + () -> deletePathQuietly(getPartitionPath(partitionId, remoteStorageBasePath))); + return new RemoteTierShuffleDescriptor(partitionId); + } + + @Override + public void releasePartition(TierShuffleDescriptor shuffleDescriptor) { + checkState(shuffleDescriptor instanceof RemoteTierShuffleDescriptor); + resourceRegistry.clearResourceFor( + ((RemoteTierShuffleDescriptor) shuffleDescriptor).getPartitionId()); } @Override - public void releasePartition(TieredStoragePartitionId partitionID) { - resourceRegistry.clearResourceFor(partitionID); + public void close() { + // noop } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java index 2a362721368..8444c29fec9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierProducerAgent.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProd import java.util.Arrays; +import static org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageUtils.getRemoteTierName; import static org.apache.flink.util.Preconditions.checkArgument; /** The implementation of {@link TierProducerAgent} for the remote tier. */ @@ -95,7 +96,7 @@ public class RemoteTierProducerAgent implements TierProducerAgent { return false; } if (buffer.isBuffer()) { - memoryManager.transferBufferOwnership(bufferOwner, this, buffer); + memoryManager.transferBufferOwnership(bufferOwner, getRemoteTierName(), buffer); } currentSubpartitionSegmentWriteBuffers[subpartitionIndex]++; cacheDataManager.appendBuffer(buffer, subpartitionIndex); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java index df544fdce47..31bed319e3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.MemorySize; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.shuffle.TieredInternalShuffleMaster; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo; import org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo; @@ -34,6 +35,7 @@ import javax.annotation.Nullable; import java.time.Duration; import java.util.Collection; import java.util.HashMap; +import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; @@ -65,7 +67,8 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> private final Map<JobID, JobShuffleContext> jobShuffleContexts = new HashMap<>(); - public NettyShuffleMaster(Configuration conf) { + public NettyShuffleMaster(ShuffleMasterContext shuffleMasterContext) { + Configuration conf = shuffleMasterContext.getConfiguration(); checkNotNull(conf); buffersPerInputChannel = conf.get(NettyShuffleEnvironmentOptions.NETWORK_BUFFERS_PER_CHANNEL); @@ -81,7 +84,7 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> networkBufferSize = ConfigurationParserUtils.getPageSize(conf); if (isHybridShuffleNewModeEnabled(conf)) { - tieredInternalShuffleMaster = new TieredInternalShuffleMaster(conf); + tieredInternalShuffleMaster = new TieredInternalShuffleMaster(shuffleMasterContext); } else { tieredInternalShuffleMaster = null; } @@ -110,23 +113,27 @@ public class NettyShuffleMaster implements ShuffleMaster<NettyShuffleDescriptor> partitionDescriptor.getPartitionId(), producerDescriptor.getProducerExecutionId()); + List<TierShuffleDescriptor> tierShuffleDescriptors = null; + if (tieredInternalShuffleMaster != null) { + tierShuffleDescriptors = + tieredInternalShuffleMaster.addPartitionAndGetShuffleDescriptor( + jobID, resultPartitionID); + } + NettyShuffleDescriptor shuffleDeploymentDescriptor = new NettyShuffleDescriptor( producerDescriptor.getProducerLocation(), createConnectionInfo( producerDescriptor, partitionDescriptor.getConnectionIndex()), - resultPartitionID); - - if (tieredInternalShuffleMaster != null) { - tieredInternalShuffleMaster.addPartition(resultPartitionID); - } + resultPartitionID, + tierShuffleDescriptors); return CompletableFuture.completedFuture(shuffleDeploymentDescriptor); } @Override public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) { if (tieredInternalShuffleMaster != null) { - tieredInternalShuffleMaster.releasePartition(shuffleDescriptor.getResultPartitionID()); + tieredInternalShuffleMaster.releasePartition(shuffleDescriptor); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java index 6530bf8f178..88e5f64dcdf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java @@ -19,12 +19,14 @@ package org.apache.flink.runtime.io.network.partition; import org.apache.flink.configuration.NettyShuffleEnvironmentOptions.CompressionCodec; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool; import org.apache.flink.runtime.io.disk.FileChannelManager; import org.apache.flink.runtime.io.disk.NoOpFileChannelManager; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.util.function.SupplierWithException; import java.io.IOException; @@ -44,6 +46,8 @@ public class ResultPartitionBuilder { private int partitionIndex = 0; + private int numberOfPartitions = 1; + private int numberOfSubpartitions = 1; private int numTargetKeyGroups = 1; @@ -278,10 +282,34 @@ public class ResultPartitionBuilder { partitionIndex, partitionId, partitionType, + numberOfPartitions, numberOfSubpartitions, numTargetKeyGroups, isBroadcast, + new TestingShuffleDescriptor(partitionId, new ResourceID("test")), factory, false); } + + private static class TestingShuffleDescriptor implements ShuffleDescriptor { + + private final ResultPartitionID resultPartitionId; + + private final ResourceID location; + + TestingShuffleDescriptor(ResultPartitionID resultPartitionId, ResourceID location) { + this.resultPartitionId = resultPartitionId; + this.location = location; + } + + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionId; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.of(location); + } + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java index 761d731579b..649346ca8e5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/TestingTieredStorageMemoryManager.java @@ -39,6 +39,8 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan private final Consumer<TaskIOMetricGroup> setMetricGroupConsumer; + private final Supplier<BufferPool> bufferPoolSupplier; + private final Consumer<Runnable> listenBufferReclaimRequestConsumer; private final Function<Object, BufferBuilder> requestBufferBlockingFunction; @@ -58,6 +60,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan private TestingTieredStorageMemoryManager( BiConsumer<BufferPool, List<TieredStorageMemorySpec>> setupConsumer, Consumer<TaskIOMetricGroup> setMetricGroupConsumer, + Supplier<BufferPool> bufferPoolSupplier, Consumer<Runnable> listenBufferReclaimRequestConsumer, Function<Object, BufferBuilder> requestBufferBlockingFunction, Function<Object, Integer> getMaxNonReclaimableBuffersFunction, @@ -68,6 +71,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan Runnable releaseRunnable) { this.setupConsumer = setupConsumer; this.setMetricGroupConsumer = setMetricGroupConsumer; + this.bufferPoolSupplier = bufferPoolSupplier; this.listenBufferReclaimRequestConsumer = listenBufferReclaimRequestConsumer; this.requestBufferBlockingFunction = requestBufferBlockingFunction; this.getMaxNonReclaimableBuffersFunction = getMaxNonReclaimableBuffersFunction; @@ -88,6 +92,11 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan setMetricGroupConsumer.accept(metricGroup); } + @Override + public BufferPool getBufferPool() { + return bufferPoolSupplier.get(); + } + @Override public void listenBufferReclaimRequest(Runnable onBufferReclaimRequest) { listenBufferReclaimRequestConsumer.accept(onBufferReclaimRequest); @@ -136,6 +145,8 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan private Consumer<TaskIOMetricGroup> setMetricGroupConsumer = (ignore) -> {}; + private Supplier<BufferPool> bufferPoolSupplier = () -> null; + private Consumer<Runnable> listenBufferReclaimRequestConsumer = runnable -> {}; private Function<Object, BufferBuilder> requestBufferFunction = owner -> null; @@ -163,6 +174,12 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan return this; } + public TestingTieredStorageMemoryManager.Builder setBufferPoolSupplier( + Supplier<BufferPool> bufferPoolSupplier) { + this.bufferPoolSupplier = bufferPoolSupplier; + return this; + } + public TestingTieredStorageMemoryManager.Builder setListenBufferReclaimRequestConsumer( Consumer<Runnable> listenBufferReclaimRequestConsumer) { this.listenBufferReclaimRequestConsumer = listenBufferReclaimRequestConsumer; @@ -215,6 +232,7 @@ public class TestingTieredStorageMemoryManager implements TieredStorageMemoryMan return new TestingTieredStorageMemoryManager( setupConsumer, setMetricGroupConsumer, + bufferPoolSupplier, listenBufferReclaimRequestConsumer, requestBufferBlockingFunction, getMaxNonReclaimableBuffersFunction, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java index f0f2f69168c..da1525be42f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TestingTierConsumerAgent.java @@ -23,16 +23,20 @@ import org.apache.flink.runtime.io.network.partition.ResultSubpartitionIndexSet; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageSubpartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.AvailabilityNotifier; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageMemoryManager; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; import java.io.IOException; import java.util.Optional; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Supplier; /** Test implementation for {@link TierConsumerAgent}. */ public class TestingTierConsumerAgent implements TierConsumerAgent { + private final Consumer<TieredStorageMemoryManager> memoryManagerConsumer; + private final Runnable startNotifier; private final Supplier<Buffer> bufferSupplier; @@ -46,18 +50,25 @@ public class TestingTierConsumerAgent implements TierConsumerAgent { private TestingTierConsumerAgent( Runnable startNotifier, + Consumer<TieredStorageMemoryManager> memoryManagerConsumer, Supplier<Buffer> bufferSupplier, Runnable availabilityNotifierRegistrationRunnable, Runnable closeNotifier, BiFunction<TieredStoragePartitionId, ResultSubpartitionIndexSet, Integer> peekNextBufferSubpartitionIdFunction) { this.startNotifier = startNotifier; + this.memoryManagerConsumer = memoryManagerConsumer; this.bufferSupplier = bufferSupplier; this.availabilityNotifierRegistrationRunnable = availabilityNotifierRegistrationRunnable; this.closeNotifier = closeNotifier; this.peekNextBufferSubpartitionIdFunction = peekNextBufferSubpartitionIdFunction; } + @Override + public void setup(TieredStorageMemoryManager memoryManager) { + memoryManagerConsumer.accept(memoryManager); + } + @Override public void start() { startNotifier.run(); @@ -94,6 +105,8 @@ public class TestingTierConsumerAgent implements TierConsumerAgent { private Runnable startNotifier = () -> {}; + private Consumer<TieredStorageMemoryManager> memoryManagerConsumer = memoryManager -> {}; + private Supplier<Buffer> bufferSupplier = () -> null; private Runnable availabilityNotifierRegistrationRunnable = () -> {}; @@ -110,6 +123,12 @@ public class TestingTierConsumerAgent implements TierConsumerAgent { return this; } + public Builder setMemoryManagerConsumer( + Consumer<TieredStorageMemoryManager> memoryManagerConsumer) { + this.memoryManagerConsumer = memoryManagerConsumer; + return this; + } + public Builder setBufferSupplier(Supplier<Buffer> bufferSupplier) { this.bufferSupplier = bufferSupplier; return this; @@ -137,6 +156,7 @@ public class TestingTierConsumerAgent implements TierConsumerAgent { public TestingTierConsumerAgent build() { return new TestingTierConsumerAgent( startNotifier, + memoryManagerConsumer, bufferSupplier, availabilityNotifierRegistrationRunnable, closeNotifier, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java index bcece554d42..fca5892c5b3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/netty/TieredStorageConsumerClientTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.Testi import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerClient; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageConsumerSpec; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierConsumerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote.TestingAvailabilityNotifier; import org.junit.jupiter.api.Test; @@ -108,6 +109,10 @@ class TieredStorageConsumerClientTest { private TieredStorageConsumerClient createTieredStorageConsumerClient( TierConsumerAgent tierConsumerAgent) { + TierShuffleDescriptor emptyTierShuffleDescriptor = + new TierShuffleDescriptor() { + private static final long serialVersionUID = 1L; + }; return new TieredStorageConsumerClient( Collections.singletonList( new TestingTierFactory.Builder() @@ -117,9 +122,11 @@ class TieredStorageConsumerClientTest { .build()), Collections.singletonList( new TieredStorageConsumerSpec( + 0, DEFAULT_PARTITION_ID, DEFAULT_INPUT_CHANNEL_ID, DEFAULT_SUBPARTITION_ID_SET)), + Collections.singletonList(Collections.singletonList(emptyTierShuffleDescriptor)), new TestingTieredStorageNettyService.Builder().build()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java index 8bec167625a..e2d771a5c72 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/shuffle/TieredResultPartitionTest.java @@ -42,8 +42,10 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.Tiered import org.apache.flink.runtime.io.network.partition.hybrid.tiered.netty.TieredStorageNettyServiceImpl; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageProducerClient; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.util.NoOpTierShuffleDescriptor; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.apache.flink.util.concurrent.IgnoreShutdownRejectedExecutionHandler; @@ -54,7 +56,9 @@ import org.junit.jupiter.api.io.TempDir; import java.io.IOException; import java.nio.ByteBuffer; +import java.util.Arrays; import java.util.Collections; +import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; @@ -277,6 +281,10 @@ class TieredResultPartitionTest { tieredStorageNettyService, tieredStorageResourceRegistry); + List<TierShuffleDescriptor> tierShuffleDescriptors = + Arrays.asList( + NoOpTierShuffleDescriptor.INSTANCE, NoOpTierShuffleDescriptor.INSTANCE); + TieredResultPartition resultPartition = tieredResultPartitionFactory.createTieredResultPartition( "TieredStoreResultPartitionTest", @@ -285,11 +293,13 @@ class TieredResultPartitionTest { ResultPartitionType.HYBRID_SELECTIVE, numSubpartitions, numSubpartitions, + Integer.MAX_VALUE, NETWORK_BUFFER_SIZE, isBroadcastOnly, true, new ResultPartitionManager(), new BufferCompressor(NETWORK_BUFFER_SIZE, CompressionCodec.LZ4), + tierShuffleDescriptors, () -> bufferPool, fileChannelManager, readBufferPool, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java index 593b3698451..041ac33c480 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/storage/TestingTierFactory.java @@ -26,15 +26,25 @@ import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierCons import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierFactory; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierMasterAgent; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierProducerAgent; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import java.util.List; import java.util.concurrent.ScheduledExecutorService; import java.util.function.BiFunction; +import java.util.function.Consumer; import java.util.function.Supplier; /** Test implementation for {@link TierFactory}. */ public class TestingTierFactory implements TierFactory { + private Consumer<Configuration> setupConsumer; + + private Supplier<TieredStorageMemorySpec> masterAgentMemorySpecSupplier; + + private Supplier<TieredStorageMemorySpec> producerAgentMemorySpecSupplier; + + private Supplier<TieredStorageMemorySpec> consumerAgentMemorySpecSupplier; + private Supplier<TierMasterAgent> tierMasterAgentSupplier; private Supplier<TierProducerAgent> tierProducerAgentSupplier; @@ -44,6 +54,10 @@ public class TestingTierFactory implements TierFactory { tierConsumerAgentSupplier; private TestingTierFactory( + Consumer<Configuration> setupConsumer, + Supplier<TieredStorageMemorySpec> masterAgentMemorySpecSupplier, + Supplier<TieredStorageMemorySpec> producerAgentMemorySpecSupplier, + Supplier<TieredStorageMemorySpec> consumerAgentMemorySpecSupplier, Supplier<TierMasterAgent> tierMasterAgentSupplier, Supplier<TierProducerAgent> tierProducerAgentSupplier, BiFunction< @@ -51,6 +65,10 @@ public class TestingTierFactory implements TierFactory { TieredStorageNettyService, TierConsumerAgent> tierConsumerAgentSupplier) { + this.setupConsumer = setupConsumer; + this.masterAgentMemorySpecSupplier = masterAgentMemorySpecSupplier; + this.producerAgentMemorySpecSupplier = producerAgentMemorySpecSupplier; + this.consumerAgentMemorySpecSupplier = consumerAgentMemorySpecSupplier; this.tierMasterAgentSupplier = tierMasterAgentSupplier; this.tierProducerAgentSupplier = tierProducerAgentSupplier; this.tierConsumerAgentSupplier = tierConsumerAgentSupplier; @@ -59,7 +77,24 @@ public class TestingTierFactory implements TierFactory { public TestingTierFactory() {} @Override - public void setup(Configuration configuration) {} + public void setup(Configuration configuration) { + setupConsumer.accept(configuration); + } + + @Override + public TieredStorageMemorySpec getMasterAgentMemorySpec() { + return masterAgentMemorySpecSupplier.get(); + } + + @Override + public TieredStorageMemorySpec getProducerAgentMemorySpec() { + return producerAgentMemorySpecSupplier.get(); + } + + @Override + public TieredStorageMemorySpec getConsumerAgentMemorySpec() { + return consumerAgentMemorySpecSupplier.get(); + } @Override public TierMasterAgent createMasterAgent( @@ -69,6 +104,7 @@ public class TestingTierFactory implements TierFactory { @Override public TierProducerAgent createProducerAgent( + int numPartitions, int numSubpartitions, TieredStoragePartitionId partitionID, String dataFileBasePath, @@ -78,6 +114,7 @@ public class TestingTierFactory implements TierFactory { TieredStorageResourceRegistry resourceRegistry, BatchShuffleReadBufferPool bufferPool, ScheduledExecutorService ioExecutor, + List<TierShuffleDescriptor> shuffleDescriptors, int maxRequestedBuffers) { return tierProducerAgentSupplier.get(); } @@ -85,6 +122,7 @@ public class TestingTierFactory implements TierFactory { @Override public TierConsumerAgent createConsumerAgent( List<TieredStorageConsumerSpec> tieredStorageConsumerSpecs, + List<TierShuffleDescriptor> shuffleDescriptors, TieredStorageNettyService nettyService) { return tierConsumerAgentSupplier.apply(tieredStorageConsumerSpecs, nettyService); } @@ -92,6 +130,14 @@ public class TestingTierFactory implements TierFactory { /** Builder for {@link TestingTierFactory}. */ public static class Builder { + private Consumer<Configuration> setupConsumer = conf -> {}; + + private Supplier<TieredStorageMemorySpec> masterAgentMemorySpecSupplier = () -> null; + + private Supplier<TieredStorageMemorySpec> producerAgentMemorySpecSupplier = () -> null; + + private Supplier<TieredStorageMemorySpec> consumerAgentMemorySpecSupplier = () -> null; + private Supplier<TierMasterAgent> tierMasterAgentSupplier = () -> null; private Supplier<TierProducerAgent> tierProducerAgentSupplier = () -> null; @@ -104,6 +150,29 @@ public class TestingTierFactory implements TierFactory { public Builder() {} + public Builder setSetupConsumer(Consumer<Configuration> setupConsumer) { + this.setupConsumer = setupConsumer; + return this; + } + + public Builder setMasterAgentMemorySpecSupplier( + Supplier<TieredStorageMemorySpec> masterAgentMemorySpecSupplier) { + this.masterAgentMemorySpecSupplier = masterAgentMemorySpecSupplier; + return this; + } + + public Builder setProducerAgentMemorySpecSupplier( + Supplier<TieredStorageMemorySpec> producerAgentMemorySpecSupplier) { + this.producerAgentMemorySpecSupplier = producerAgentMemorySpecSupplier; + return this; + } + + public Builder setConsumerAgentMemorySpecSupplier( + Supplier<TieredStorageMemorySpec> consumerAgentMemorySpecSupplier) { + this.consumerAgentMemorySpecSupplier = consumerAgentMemorySpecSupplier; + return this; + } + public Builder setTierMasterAgentSupplier( Supplier<TierMasterAgent> tierMasterAgentSupplier) { this.tierMasterAgentSupplier = tierMasterAgentSupplier; @@ -128,7 +197,13 @@ public class TestingTierFactory implements TierFactory { public TestingTierFactory build() { return new TestingTierFactory( - tierMasterAgentSupplier, tierProducerAgentSupplier, tierConsumerAgentSupplier); + setupConsumer, + masterAgentMemorySpecSupplier, + producerAgentMemorySpecSupplier, + consumerAgentMemorySpecSupplier, + tierMasterAgentSupplier, + tierProducerAgentSupplier, + tierConsumerAgentSupplier); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java index acaa0c0487e..e8e9eb5b7e7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierConsumerAgentTest.java @@ -60,6 +60,7 @@ class RemoteTierConsumerAgentTest { new RemoteTierConsumerAgent( Collections.singletonList( new TieredStorageConsumerSpec( + 0, partitionId, new TieredStorageInputChannelId(0), new ResultSubpartitionIndexSet(0))), @@ -92,6 +93,7 @@ class RemoteTierConsumerAgentTest { new RemoteTierConsumerAgent( Collections.singletonList( new TieredStorageConsumerSpec( + 0, partitionId, new TieredStorageInputChannelId(0), new ResultSubpartitionIndexSet(0))), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java index 289daaf8385..73b2d65cd24 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/tiered/tier/remote/RemoteTierMasterAgentTest.java @@ -18,10 +18,12 @@ package org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.remote; +import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStorageIdMappingUtils; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.common.TieredStoragePartitionId; import org.apache.flink.runtime.io.network.partition.hybrid.tiered.storage.TieredStorageResourceRegistry; +import org.apache.flink.runtime.io.network.partition.hybrid.tiered.tier.TierShuffleDescriptor; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.io.TempDir; @@ -39,8 +41,9 @@ class RemoteTierMasterAgentTest { @Test void testAddAndReleasePartition() throws IOException { + ResultPartitionID resultPartitionID = new ResultPartitionID(); TieredStoragePartitionId partitionId = - TieredStorageIdMappingUtils.convertId(new ResultPartitionID()); + TieredStorageIdMappingUtils.convertId(resultPartitionID); File partitionFile = new File(getPartitionPath(partitionId, tempFolder.getAbsolutePath())); assertThat(partitionFile.createNewFile()).isTrue(); assertThat(partitionFile.exists()).isTrue(); @@ -48,9 +51,10 @@ class RemoteTierMasterAgentTest { TieredStorageResourceRegistry resourceRegistry = new TieredStorageResourceRegistry(); RemoteTierMasterAgent masterAgent = new RemoteTierMasterAgent(tempFolder.getAbsolutePath(), resourceRegistry); - masterAgent.addPartition(partitionId); + TierShuffleDescriptor tierShuffleDescriptor = + masterAgent.addPartitionAndGetShuffleDescriptor(new JobID(), resultPartitionID); assertThat(partitionFile.exists()).isTrue(); - masterAgent.releasePartition(partitionId); + masterAgent.releasePartition(tierShuffleDescriptor); assertThat(partitionFile.exists()).isFalse(); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java index a3faeb1d4f4..0f465166e19 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/BatchJobRecoveryTest.java @@ -74,6 +74,7 @@ import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.shuffle.PartitionWithMetrics; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.shuffle.ShuffleMasterContextImpl; import org.apache.flink.runtime.shuffle.ShuffleMetrics; import org.apache.flink.runtime.source.coordinator.SourceCoordinator; import org.apache.flink.runtime.source.coordinator.SourceCoordinatorProvider; @@ -1042,7 +1043,8 @@ public class BatchJobRecoveryTest { throws Exception { final ShuffleMaster<NettyShuffleDescriptor> shuffleMaster = - new NettyShuffleMaster(new Configuration()); + new NettyShuffleMaster( + new ShuffleMasterContextImpl(new Configuration(), throwable -> {})); TestingJobMasterGateway jobMasterGateway = new TestingJobMasterGatewayBuilder() .setGetPartitionWithMetricsFunction( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java index 0d1f92b583c..d1d0795aa53 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java @@ -152,7 +152,7 @@ class ShuffleMasterTest { private final boolean stopTrackingPartition; public TestShuffleMaster(Configuration conf) { - super(conf); + super(new ShuffleMasterContextImpl(conf, throwable -> {})); this.stopTrackingPartition = conf.getBoolean(STOP_TRACKING_PARTITION_KEY, false); currentInstance.set(this); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java index 4ada32cef0a..f35b5f51b43 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleTestUtils.java @@ -24,7 +24,8 @@ import org.apache.flink.configuration.Configuration; public class ShuffleTestUtils { public static final ShuffleMaster<?> DEFAULT_SHUFFLE_MASTER = - new NettyShuffleMaster(new Configuration()); + new NettyShuffleMaster( + new ShuffleMasterContextImpl(new Configuration(), throwable -> {})); /** Private default constructor to avoid being instantiated. */ private ShuffleTestUtils() {}