This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 347423f8464c530efd308c5ea7abb34a5808e0c9
Author: Weijie Guo <res...@163.com>
AuthorDate: Thu Jul 28 21:06:12 2022 +0800

    [FLINK-27908] Let HsMemoryDataManager can register 
HsSubpartitionViewInternalOperations and supports notifyDataAvailable.
---
 .../partition/hybrid/HsMemoryDataManager.java      | 44 ++++++++++++++++++++--
 .../hybrid/HsMemoryDataManagerOperation.java       |  7 ++++
 .../hybrid/HsSubpartitionFileReaderImpl.java       |  4 +-
 .../hybrid/HsSubpartitionMemoryDataManager.java    |  5 +--
 .../HsSubpartitionViewInternalOperations.java      |  4 +-
 .../hybrid/TestingMemoryDataManagerOperation.java  | 21 ++++++++++-
 .../TestingSubpartitionViewInternalOperation.java  |  2 +-
 7 files changed, 74 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
index 7acc9d0bdbb..9bf57b51c4b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java
@@ -26,16 +26,19 @@ import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSpillingStrategy.D
 import org.apache.flink.util.concurrent.FutureUtils;
 import org.apache.flink.util.function.SupplierWithException;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.file.Path;
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.Deque;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReadWriteLock;
@@ -44,6 +47,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 /** This class is responsible for managing data in memory. */
 public class HsMemoryDataManager implements HsSpillingInfoProvider, 
HsMemoryDataManagerOperation {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(HsMemoryDataManager.class);
+
     private final int numSubpartitions;
 
     private final HsSubpartitionMemoryDataManager[] 
subpartitionMemoryDataManagers;
@@ -62,6 +67,9 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
 
     private final AtomicInteger numUnSpillBuffers = new AtomicInteger(0);
 
+    private final Map<Integer, HsSubpartitionViewInternalOperations> 
subpartitionViewOperationsMap =
+            new ConcurrentHashMap<>();
+
     public HsMemoryDataManager(
             int numSubpartitions,
             int bufferSize,
@@ -108,6 +116,22 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
         }
     }
 
+    /**
+     * Register {@link HsSubpartitionViewInternalOperations} to {@link
+     * #subpartitionViewOperationsMap}. It is used to obtain the consumption 
progress of the
+     * subpartition.
+     */
+    public void registerSubpartitionView(
+            int subpartitionId, HsSubpartitionViewInternalOperations 
viewOperations) {
+        HsSubpartitionViewInternalOperations oldView =
+                subpartitionViewOperationsMap.put(subpartitionId, 
viewOperations);
+        if (oldView != null) {
+            LOG.debug(
+                    "subpartition : {} register subpartition view will replace 
old view. ",
+                    subpartitionId);
+        }
+    }
+
     /** Close this {@link HsMemoryDataManager}, it means no data can append to 
memory. */
     public void close() {
         Decision decision = callWithLock(() -> 
spillStrategy.onResultPartitionClosed(this));
@@ -159,8 +183,13 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
     // Write lock should be acquired before invoke this method.
     @Override
     public List<Integer> getNextBufferIndexToConsume() {
-        // TODO implements this logical when subpartition view is implemented.
-        return Collections.emptyList();
+        ArrayList<Integer> consumeIndexes = new ArrayList<>(numSubpartitions);
+        for (int channel = 0; channel < numSubpartitions; channel++) {
+            HsSubpartitionViewInternalOperations viewOperation =
+                    subpartitionViewOperationsMap.get(channel);
+            consumeIndexes.add(viewOperation == null ? -1 : 
viewOperation.getConsumingOffset() + 1);
+        }
+        return consumeIndexes;
     }
 
     // ------------------------------------
@@ -196,6 +225,15 @@ public class HsMemoryDataManager implements 
HsSpillingInfoProvider, HsMemoryData
         handleDecision(decision);
     }
 
+    @Override
+    public void onDataAvailable(int subpartitionId) {
+        HsSubpartitionViewInternalOperations 
subpartitionViewInternalOperations =
+                subpartitionViewOperationsMap.get(subpartitionId);
+        if (subpartitionViewInternalOperations != null) {
+            subpartitionViewInternalOperations.notifyDataAvailable();
+        }
+    }
+
     // ------------------------------------
     //           Internal Method
     // ------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
index d34b251a700..ca86a1ae913 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManagerOperation.java
@@ -49,4 +49,11 @@ public interface HsMemoryDataManagerOperation {
 
     /** This method is called when buffer is finished. */
     void onBufferFinished();
+
+    /**
+     * This method is called when subpartition data become available.
+     *
+     * @param subpartitionId the subpartition need notify data available.
+     */
+    void onDataAvailable(int subpartitionId);
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
index 2f29f2409ab..ecf02260dd1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderImpl.java
@@ -152,7 +152,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
         }
 
         if (loadedBuffers.size() <= numLoaded) {
-            operations.notifyDataAvailableFromDisk();
+            operations.notifyDataAvailable();
         }
     }
 
@@ -171,7 +171,7 @@ public class HsSubpartitionFileReaderImpl implements 
HsSubpartitionFileReader {
         }
 
         loadedBuffers.add(BufferIndexOrError.newError(failureCause));
-        operations.notifyDataAvailableFromDisk();
+        operations.notifyDataAvailable();
     }
 
     /** Refresh downstream consumption progress for another round scheduling 
of reading. */
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
index 8084a8883f3..b5079e7df21 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionMemoryDataManager.java
@@ -351,11 +351,10 @@ public class HsSubpartitionMemoryDataManager {
                                     
bufferContext.getBufferIndexAndChannel().getBufferIndex(),
                                     bufferContext);
                             trimHeadingReleasedBuffers(unConsumedBuffers);
-                            return unConsumedBuffers.isEmpty();
+                            return unConsumedBuffers.size() <= 1;
                         });
         if (needNotify) {
-            // TODO notify data available, the notification mechanism may need 
further
-            // consideration.
+            memoryDataManagerOperation.onDataAvailable(targetChannel);
         }
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
index 1cd6f97ac83..053e5a291ba 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionViewInternalOperations.java
@@ -24,8 +24,8 @@ package org.apache.flink.runtime.io.network.partition.hybrid;
  */
 public interface HsSubpartitionViewInternalOperations {
 
-    /** Callback for new data become available from disk. */
-    void notifyDataAvailableFromDisk();
+    /** Callback for new data become available. */
+    void notifyDataAvailable();
 
     /** Get the latest consuming offset of the subpartition. */
     int getConsumingOffset();
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
index f78774ca674..bb441d667af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingMemoryDataManagerOperation.java
@@ -35,16 +35,20 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
 
     private final Runnable onBufferFinishedRunnable;
 
+    private final Runnable onDataAvailableRunnable;
+
     private TestingMemoryDataManagerOperation(
             SupplierWithException<BufferBuilder, InterruptedException>
                     requestBufferFromPoolSupplier,
             BiConsumer<Integer, Integer> markBufferReadableConsumer,
             Consumer<BufferIndexAndChannel> onBufferConsumedConsumer,
-            Runnable onBufferFinishedRunnable) {
+            Runnable onBufferFinishedRunnable,
+            Runnable onDataAvailableRunnable) {
         this.requestBufferFromPoolSupplier = requestBufferFromPoolSupplier;
         this.markBufferReadableConsumer = markBufferReadableConsumer;
         this.onBufferConsumedConsumer = onBufferConsumedConsumer;
         this.onBufferFinishedRunnable = onBufferFinishedRunnable;
+        this.onDataAvailableRunnable = onDataAvailableRunnable;
     }
 
     @Override
@@ -67,6 +71,11 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
         onBufferFinishedRunnable.run();
     }
 
+    @Override
+    public void onDataAvailable(int subpartitionId) {
+        onDataAvailableRunnable.run();
+    }
+
     public static Builder builder() {
         return new Builder();
     }
@@ -82,6 +91,8 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
 
         private Runnable onBufferFinishedRunnable = () -> {};
 
+        private Runnable onDataAvailableRunnable = () -> {};
+
         public Builder setRequestBufferFromPoolSupplier(
                 SupplierWithException<BufferBuilder, InterruptedException>
                         requestBufferFromPoolSupplier) {
@@ -106,6 +117,11 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
             return this;
         }
 
+        public Builder setOnDataAvailableRunnable(Runnable 
onDataAvailableRunnable) {
+            this.onDataAvailableRunnable = onDataAvailableRunnable;
+            return this;
+        }
+
         private Builder() {}
 
         public TestingMemoryDataManagerOperation build() {
@@ -113,7 +129,8 @@ public class TestingMemoryDataManagerOperation implements 
HsMemoryDataManagerOpe
                     requestBufferFromPoolSupplier,
                     markBufferReadableConsumer,
                     onBufferConsumedConsumer,
-                    onBufferFinishedRunnable);
+                    onBufferFinishedRunnable,
+                    onDataAvailableRunnable);
         }
     }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
index 9042d9d377d..7ac32be04f9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/TestingSubpartitionViewInternalOperation.java
@@ -27,7 +27,7 @@ public class TestingSubpartitionViewInternalOperation
     private Runnable notifyDataAvailableRunnable = () -> {};
 
     @Override
-    public void notifyDataAvailableFromDisk() {
+    public void notifyDataAvailable() {
         notifyDataAvailableRunnable.run();
     }
 

Reply via email to