xintongsong commented on code in PR #21419:
URL: https://github.com/apache/flink/pull/21419#discussion_r1040349304


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java:
##########
@@ -181,18 +188,33 @@ public HsDataView registerNewConsumer(
 
     /** Close this {@link HsMemoryDataManager}, it means no data can append to 
memory. */
     public void close() {
-        Decision decision = callWithLock(() -> 
spillStrategy.onResultPartitionClosed(this));
-        handleDecision(Optional.of(decision));
-        spiller.close();
-        poolSizeChecker.shutdown();
+        synchronized (releaseAndCloseLock) {
+            if (!isAllDataReleased) {
+                spillAndReleaseAllData();
+            }
+            spiller.close();
+            poolSizeChecker.shutdown();
+        }
     }
 
     /**
      * Release this {@link HsMemoryDataManager}, it means all memory taken by 
this class will
      * recycle.
      */
     public void release() {
-        spiller.release();
+        synchronized (releaseAndCloseLock) {
+            if (!isAllDataReleased) {
+                spillAndReleaseAllData();
+            }
+            spiller.release();
+        }
+    }

Review Comment:
   Not sure about introducing a dedicated lock for this.
   
   I checked contracts in `ResultPartitionWriter`, it says `close` means 
releasing all allocated resources and `release` means releasing all the data. 
For the memory data manager, these are probably the same thing. Once closed, 
data can only be consumed from the file data manager.
   
   So if `close` is always called, maybe we can do everything in `close` and 
nothing in `release`. I assume it doesn't really matters if the buffers are not 
released immediately when `release` is called first? In this way, we won't need 
the lock between these two.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java:
##########
@@ -234,13 +239,15 @@ private void checkAllExchangesBlocking(final JobGraph 
jobGraph) {
         for (JobVertex jobVertex : jobGraph.getVertices()) {
             for (IntermediateDataSet dataSet : 
jobVertex.getProducedDataSets()) {
                 checkState(
-                        
dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition(),
+                        
dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition()
+                                || dataSet.getResultType() == 
ResultPartitionType.HYBRID_FULL,
                         String.format(
                                 "At the moment, adaptive batch scheduler 
requires batch workloads "
-                                        + "to be executed with types of all 
edges being BLOCKING. "
-                                        + "To do that, you need to configure 
'%s' to '%s'.",
+                                        + "to be executed with types of all 
edges being BLOCKING or HYBRID_FULL. "
+                                        + "To do that, you need to configure 
'%s' to '%s' or '%s'.",
                                 ExecutionOptions.BATCH_SHUFFLE_MODE.key(),
-                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING));
+                                BatchShuffleMode.ALL_EXCHANGES_BLOCKING,
+                                BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL));

Review Comment:
   Are we deciding the shuffle mode based on the scheduler in anywhere?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java:
##########
@@ -295,6 +296,14 @@ void cachePartitionInfo(PartitionInfo partitionInfo) {
                 "Method is not supported in SpeculativeExecutionVertex.");
     }
 
+    @Override
+    protected boolean needMarkPartitionFinished(ResultPartitionType 
resultPartitionType) {
+        // for speculative execution, only blocking or hybrid full result 
partition need mark
+        // finished.
+        return 
resultPartitionType.isBlockingOrBlockingPersistentResultPartition()
+                || resultPartitionType == ResultPartitionType.HYBRID_FULL;
+    }

Review Comment:
   I think this is not only for the speculative execution vertex. By default, 
in speculative execution we want the downstream to only consume the finished 
upstream partitions, and in non-speculative execution we want the downstream to 
also consume the unfinished partitions. However, user should also be able to 
choose other behaviors, consuming unfinished partitions even in speculative 
mode, or consuming only finished partitions in non-speculative mode.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to