This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 347423f8464c530efd308c5ea7abb34a5808e0c9 Author: Weijie Guo <res...@163.com> AuthorDate: Thu Jul 28 21:06:12 2022 +0800 [FLINK-27908] Let HsMemoryDataManager can register HsSubpartitionViewInternalOperations and supports notifyDataAvailable. --- .../partition/hybrid/HsMemoryDataManager.java | 44 ++++++++++++++++++++-- .../hybrid/HsMemoryDataManagerOperation.java | 7 ++++ .../hybrid/HsSubpartitionFileReaderImpl.java | 4 +- .../hybrid/HsSubpartitionMemoryDataManager.java | 5 +-- .../HsSubpartitionViewInternalOperations.java | 4 +- .../hybrid/TestingMemoryDataManagerOperation.java | 21 ++++++++++- .../TestingSubpartitionViewInternalOperation.java | 2 +- 7 files changed, 74 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java index 7acc9d0bdbb..9bf57b51c4b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java @@ -26,16 +26,19 @@ import org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.D import org.apache.flink.util.concurrent.FutureUtils; import org.apache.flink.util.function.SupplierWithException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.nio.ByteBuffer; import java.nio.file.Path; import java.util.ArrayList; -import java.util.Collections; import java.util.Deque; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReadWriteLock; @@ -44,6 +47,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; /** This class is responsible for managing data in memory. */ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryDataManagerOperation { + private static final Logger LOG = LoggerFactory.getLogger(HsMemoryDataManager.class); + private final int numSubpartitions; private final HsSubpartitionMemoryDataManager[] subpartitionMemoryDataManagers; @@ -62,6 +67,9 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0); + private final Map<Integer, HsSubpartitionViewInternalOperations> subpartitionViewOperationsMap = + new ConcurrentHashMap<>(); + public HsMemoryDataManager( int numSubpartitions, int bufferSize, @@ -108,6 +116,22 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData } } + /** + * Register {@link HsSubpartitionViewInternalOperations} to {@link + * #subpartitionViewOperationsMap}. It is used to obtain the consumption progress of the + * subpartition. + */ + public void registerSubpartitionView( + int subpartitionId, HsSubpartitionViewInternalOperations viewOperations) { + HsSubpartitionViewInternalOperations oldView = + subpartitionViewOperationsMap.put(subpartitionId, viewOperations); + if (oldView != null) { + LOG.debug( + "subpartition : {} register subpartition view will replace old view. ", + subpartitionId); + } + } + /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */ public void close() { Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this)); @@ -159,8 +183,13 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData // Write lock should be acquired before invoke this method. @Override public List<Integer> getNextBufferIndexToConsume() { - // TODO implements this logical when subpartition view is implemented. - return Collections.emptyList(); + ArrayList<Integer> consumeIndexes = new ArrayList<>(numSubpartitions); + for (int channel = 0; channel < numSubpartitions; channel++) { + HsSubpartitionViewInternalOperations viewOperation = + subpartitionViewOperationsMap.get(channel); + consumeIndexes.add(viewOperation == null ? -1 : viewOperation.getConsumingOffset() + 1); + } + return consumeIndexes; } // ------------------------------------ @@ -196,6 +225,15 @@ public class HsMemoryDataManager implements HsSpillingInfoProvider, HsMemoryData handleDecision(decision); } + @Override + public void onDataAvailable(int subpartitionId) { + HsSubpartitionViewInternalOperations subpartitionViewInternalOperations = + subpartitionViewOperationsMap.get(subpartitionId); + if (subpartitionViewInternalOperations != null) { + subpartitionViewInternalOperations.notifyDataAvailable(); + } + } + // ------------------------------------ // Internal Method // ------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java index d34b251a700..ca86a1ae913 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java @@ -49,4 +49,11 @@ public interface HsMemoryDataManagerOperation { /** This method is called when buffer is finished. */ void onBufferFinished(); + + /** + * This method is called when subpartition data become available. + * + * @param subpartitionId the subpartition need notify data available. + */ + void onDataAvailable(int subpartitionId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java index 2f29f2409ab..ecf02260dd1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java @@ -152,7 +152,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { } if (loadedBuffers.size() <= numLoaded) { - operations.notifyDataAvailableFromDisk(); + operations.notifyDataAvailable(); } } @@ -171,7 +171,7 @@ public class HsSubpartitionFileReaderImpl implements HsSubpartitionFileReader { } loadedBuffers.add(BufferIndexOrError.newError(failureCause)); - operations.notifyDataAvailableFromDisk(); + operations.notifyDataAvailable(); } /** Refresh downstream consumption progress for another round scheduling of reading. */ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java index 8084a8883f3..b5079e7df21 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java @@ -351,11 +351,10 @@ public class HsSubpartitionMemoryDataManager { bufferContext.getBufferIndexAndChannel().getBufferIndex(), bufferContext); trimHeadingReleasedBuffers(unConsumedBuffers); - return unConsumedBuffers.isEmpty(); + return unConsumedBuffers.size() <= 1; }); if (needNotify) { - // TODO notify data available, the notification mechanism may need further - // consideration. + memoryDataManagerOperation.onDataAvailable(targetChannel); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java index 1cd6f97ac83..053e5a291ba 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java @@ -24,8 +24,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid; */ public interface HsSubpartitionViewInternalOperations { - /** Callback for new data become available from disk. */ - void notifyDataAvailableFromDisk(); + /** Callback for new data become available. */ + void notifyDataAvailable(); /** Get the latest consuming offset of the subpartition. */ int getConsumingOffset(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java index f78774ca674..bb441d667af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java @@ -35,16 +35,20 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe private final Runnable onBufferFinishedRunnable; + private final Runnable onDataAvailableRunnable; + private TestingMemoryDataManagerOperation( SupplierWithException<BufferBuilder, InterruptedException> requestBufferFromPoolSupplier, BiConsumer<Integer, Integer> markBufferReadableConsumer, Consumer<BufferIndexAndChannel> onBufferConsumedConsumer, - Runnable onBufferFinishedRunnable) { + Runnable onBufferFinishedRunnable, + Runnable onDataAvailableRunnable) { this.requestBufferFromPoolSupplier = requestBufferFromPoolSupplier; this.markBufferReadableConsumer = markBufferReadableConsumer; this.onBufferConsumedConsumer = onBufferConsumedConsumer; this.onBufferFinishedRunnable = onBufferFinishedRunnable; + this.onDataAvailableRunnable = onDataAvailableRunnable; } @Override @@ -67,6 +71,11 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe onBufferFinishedRunnable.run(); } + @Override + public void onDataAvailable(int subpartitionId) { + onDataAvailableRunnable.run(); + } + public static Builder builder() { return new Builder(); } @@ -82,6 +91,8 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe private Runnable onBufferFinishedRunnable = () -> {}; + private Runnable onDataAvailableRunnable = () -> {}; + public Builder setRequestBufferFromPoolSupplier( SupplierWithException<BufferBuilder, InterruptedException> requestBufferFromPoolSupplier) { @@ -106,6 +117,11 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe return this; } + public Builder setOnDataAvailableRunnable(Runnable onDataAvailableRunnable) { + this.onDataAvailableRunnable = onDataAvailableRunnable; + return this; + } + private Builder() {} public TestingMemoryDataManagerOperation build() { @@ -113,7 +129,8 @@ public class TestingMemoryDataManagerOperation implements HsMemoryDataManagerOpe requestBufferFromPoolSupplier, markBufferReadableConsumer, onBufferConsumedConsumer, - onBufferFinishedRunnable); + onBufferFinishedRunnable, + onDataAvailableRunnable); } } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java index 9042d9d377d..7ac32be04f9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java @@ -27,7 +27,7 @@ public class TestingSubpartitionViewInternalOperation private Runnable notifyDataAvailableRunnable = () -> {}; @Override - public void notifyDataAvailableFromDisk() { + public void notifyDataAvailable() { notifyDataAvailableRunnable.run(); }