xintongsong commented on code in PR #21419: URL: https://github.com/apache/flink/pull/21419#discussion_r1040349304
########## flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/hybrid/HsMemoryDataManager.java: ########## @@ -181,18 +188,33 @@ public HsDataView registerNewConsumer( /** Close this {@link HsMemoryDataManager}, it means no data can append to memory. */ public void close() { - Decision decision = callWithLock(() -> spillStrategy.onResultPartitionClosed(this)); - handleDecision(Optional.of(decision)); - spiller.close(); - poolSizeChecker.shutdown(); + synchronized (releaseAndCloseLock) { + if (!isAllDataReleased) { + spillAndReleaseAllData(); + } + spiller.close(); + poolSizeChecker.shutdown(); + } } /** * Release this {@link HsMemoryDataManager}, it means all memory taken by this class will * recycle. */ public void release() { - spiller.release(); + synchronized (releaseAndCloseLock) { + if (!isAllDataReleased) { + spillAndReleaseAllData(); + } + spiller.release(); + } + } Review Comment: Not sure about introducing a dedicated lock for this. I checked contracts in `ResultPartitionWriter`, it says `close` means releasing all allocated resources and `release` means releasing all the data. For the memory data manager, these are probably the same thing. Once closed, data can only be consumed from the file data manager. So if `close` is always called, maybe we can do everything in `close` and nothing in `release`. I assume it doesn't really matters if the buffers are not released immediately when `release` is called first? In this way, we won't need the lock between these two. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java: ########## @@ -234,13 +239,15 @@ private void checkAllExchangesBlocking(final JobGraph jobGraph) { for (JobVertex jobVertex : jobGraph.getVertices()) { for (IntermediateDataSet dataSet : jobVertex.getProducedDataSets()) { checkState( - dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition(), + dataSet.getResultType().isBlockingOrBlockingPersistentResultPartition() + || dataSet.getResultType() == ResultPartitionType.HYBRID_FULL, String.format( "At the moment, adaptive batch scheduler requires batch workloads " - + "to be executed with types of all edges being BLOCKING. " - + "To do that, you need to configure '%s' to '%s'.", + + "to be executed with types of all edges being BLOCKING or HYBRID_FULL. " + + "To do that, you need to configure '%s' to '%s' or '%s'.", ExecutionOptions.BATCH_SHUFFLE_MODE.key(), - BatchShuffleMode.ALL_EXCHANGES_BLOCKING)); + BatchShuffleMode.ALL_EXCHANGES_BLOCKING, + BatchShuffleMode.ALL_EXCHANGES_HYBRID_FULL)); Review Comment: Are we deciding the shuffle mode based on the scheduler in anywhere? ########## flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/SpeculativeExecutionVertex.java: ########## @@ -295,6 +296,14 @@ void cachePartitionInfo(PartitionInfo partitionInfo) { "Method is not supported in SpeculativeExecutionVertex."); } + @Override + protected boolean needMarkPartitionFinished(ResultPartitionType resultPartitionType) { + // for speculative execution, only blocking or hybrid full result partition need mark + // finished. + return resultPartitionType.isBlockingOrBlockingPersistentResultPartition() + || resultPartitionType == ResultPartitionType.HYBRID_FULL; + } Review Comment: I think this is not only for the speculative execution vertex. By default, in speculative execution we want the downstream to only consume the finished upstream partitions, and in non-speculative execution we want the downstream to also consume the unfinished partitions. However, user should also be able to choose other behaviors, consuming unfinished partitions even in speculative mode, or consuming only finished partitions in non-speculative mode. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org