reswqa commented on code in PR #19960:
URL: https://github.com/apache/flink/pull/19960#discussion_r915527214


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(prepareForSchedulingFinished).isTrue();
+                    assertThat(buffers).hasSize(2);
+                    // poll one buffer, return another buffer to scheduler.
+                    buffers.poll();
+                    assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        // not used buffer should be recycled.
+        assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);
+    }
+
+    /** Test scheduler will schedule readers in order. */
+    @Test
+    void testScheduleReadersOrdered() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory1 =
+                new TestingHsSubpartitionFileReader.Factory();
+        TestingHsSubpartitionFileReader.Factory factory2 =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean readBuffersFinished1 = new AtomicBoolean(false);
+        AtomicBoolean readBuffersFinished2 = new AtomicBoolean(false);
+        factory1.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished2).isFalse();
+                    readBuffersFinished1.set(true);
+                });
+        factory2.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished1).isTrue();
+                    readBuffersFinished2.set(true);
+                });
+
+        factory1.setPrioritySupplier(() -> 1);
+        factory2.setPrioritySupplier(() -> 2);
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory1);
+        readScheduler.registerNewSubpartition(1, subpartitionViewOperation, 
factory2);
+
+        // trigger run.
+        ioExecutor.trigger();
+
+        assertThat(readBuffersFinished2).isTrue();
+    }
+
+    @Test
+    void testRunRequestBufferTimeout() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+
+        // request all buffer first.
+        bufferPool.requestBuffers();
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(),
+                                NUM_SUBPARTITIONS,
+                                bufferRequestTimeout));
+
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setFailConsumer((cause::set));
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(prepareForSchedulingFinished).isTrue();
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(TimeoutException.class)
+                                        .hasMessageContaining("Buffer request 
timeout"));
+    }
+
+    /**
+     * When {@link SubpartitionFileReader#readBuffers(Queue, BufferRecycler)} 
throw IOException,
+     * subpartition reader should fail.
+     */
+    @Test
+    void testRunReadBuffersThrowException() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    throw new IOException("expected exception.");
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(IOException.class)
+                                        .hasMessageContaining("expected 
exception."));
+    }
+
+    // ----------------------- test release 
---------------------------------------
+
+    /** Test scheduler release when reader is reading buffers. */
+    @Test
+    void testReleasedWhenReading() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        AtomicReference<Throwable> cause = new AtomicReference<>(null);
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer((buffers, recycle) -> 
readScheduler.release());
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        
.isInstanceOf(IllegalStateException.class)
+                                        .hasMessageContaining(
+                                                "Result partition has been 
already released."));
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+    }
+
+    /** Test scheduler was released, but receive new subpartition reader 
registration. */
+    @Test
+    void testRegisterSubpartitionReaderAfterSchedulerReleased() {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        readScheduler.release();
+        assertThatThrownBy(
+                        () -> {
+                            readScheduler.registerNewSubpartition(
+                                    0, subpartitionViewOperation, factory);
+                            ioExecutor.trigger();
+                        })
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("HsResultPartitionReadScheduler is 
already released.");
+    }
+
+    private static FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.READ);
+    }
+
+    private static class TestingHsSubpartitionFileReader implements 
SubpartitionFileReader {
+        private final Runnable prepareForSchedulingRunnable;
+
+        private final BiConsumerWithException<Queue<MemorySegment>, 
BufferRecycler, IOException>
+                readBuffersConsumer;
+
+        private final Consumer<Throwable> failConsumer;
+
+        private final Supplier<Integer> prioritySupplier;
+
+        public TestingHsSubpartitionFileReader(
+                Runnable prepareForSchedulingRunnable,
+                BiConsumerWithException<Queue<MemorySegment>, BufferRecycler, 
IOException>
+                        readBuffersConsumer,
+                Consumer<Throwable> failConsumer,
+                Supplier<Integer> prioritySupplier) {
+            this.prepareForSchedulingRunnable = prepareForSchedulingRunnable;
+            this.readBuffersConsumer = readBuffersConsumer;
+            this.failConsumer = failConsumer;
+            this.prioritySupplier = prioritySupplier;
+        }
+
+        @Override
+        public void prepareForScheduling() {
+            prepareForSchedulingRunnable.run();
+        }
+
+        @Override
+        public void readBuffers(Queue<MemorySegment> buffers, BufferRecycler 
recycler)
+                throws IOException {
+            readBuffersConsumer.accept(buffers, recycler);
+        }
+
+        @Override
+        public void fail(Throwable failureCause) {
+            failConsumer.accept(failureCause);
+        }
+
+        @Override
+        public int compareTo(SubpartitionFileReader that) {
+            checkArgument(that instanceof TestingHsSubpartitionFileReader);
+            return Integer.compare(
+                    prioritySupplier.get(),
+                    ((TestingHsSubpartitionFileReader) 
that).prioritySupplier.get());
+        }
+
+        public static class Factory implements SubpartitionFileReader.Factory {

Review Comment:
   good suggesstion.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsResultPartitionReadSchedulerTest.java:
##########
@@ -0,0 +1,403 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.disk.BatchShuffleReadBufferPool;
+import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
+import org.apache.flink.util.TestLoggerExtension;
+import org.apache.flink.util.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.util.function.BiConsumerWithException;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.time.Duration;
+import java.util.HashSet;
+import java.util.Queue;
+import java.util.Random;
+import java.util.Set;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsResultPartitionReadScheduler}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsResultPartitionReadSchedulerTest {
+    private static final int BUFFER_SIZE = 1024;
+
+    private static final int NUM_SUBPARTITIONS = 10;
+
+    private final byte[] dataBytes = new byte[BUFFER_SIZE];
+
+    private ManuallyTriggeredScheduledExecutor ioExecutor;
+
+    private BatchShuffleReadBufferPool bufferPool;
+
+    private FileChannel dataFileChannel;
+
+    private Path dataFilePath;
+
+    private HsResultPartitionReadScheduler readScheduler;
+
+    private TestingSubpartitionViewInternalOperation subpartitionViewOperation;
+
+    @BeforeEach
+    void before(@TempDir Path tempDir) throws IOException {
+        Random random = new Random();
+        random.nextBytes(dataBytes);
+        bufferPool = new BatchShuffleReadBufferPool(2 * BUFFER_SIZE, 
BUFFER_SIZE);
+        ioExecutor = new ManuallyTriggeredScheduledExecutor();
+        dataFilePath = Files.createFile(tempDir.resolve(".data"));
+        dataFileChannel = openFileChannel(dataFilePath);
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(), 
NUM_SUBPARTITIONS));
+        subpartitionViewOperation = new 
TestingSubpartitionViewInternalOperation();
+    }
+
+    @AfterEach
+    void after() throws Exception {
+        bufferPool.destroy();
+        if (dataFileChannel != null) {
+            dataFileChannel.close();
+        }
+    }
+
+    // ----------------------- test run and register 
---------------------------------------
+
+    @Test
+    void testRegisterReaderTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    @Test
+    void testBufferReleasedTriggerRun() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        Set<MemorySegment> memorySegmentSets = new HashSet<>();
+        AtomicReference<BufferRecycler> recycleRef = new 
AtomicReference<>(null);
+        factory.setReadBuffersConsumer(
+                (requestedBuffer, recycle) -> {
+                    while (!requestedBuffer.isEmpty()) {
+                        memorySegmentSets.add(requestedBuffer.poll());
+                    }
+                    recycleRef.set(recycle);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(recycleRef).isNotNull();
+        assertThat(memorySegmentSets).hasSize(2);
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        recycleRef.get().recycle(memorySegmentSets.iterator().next());
+
+        assertThat(ioExecutor.numQueuedRunnables()).isEqualTo(1);
+    }
+
+    /** Test all not used buffers will be released after run method finish. */
+    @Test
+    void testRunReleaseAllBuffers() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(prepareForSchedulingFinished).isTrue();
+                    assertThat(buffers).hasSize(2);
+                    // poll one buffer, return another buffer to scheduler.
+                    buffers.poll();
+                    assertThat(bufferPool.getAvailableBuffers()).isEqualTo(0);
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        // not used buffer should be recycled.
+        assertThat(bufferPool.getAvailableBuffers()).isEqualTo(1);
+    }
+
+    /** Test scheduler will schedule readers in order. */
+    @Test
+    void testScheduleReadersOrdered() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory1 =
+                new TestingHsSubpartitionFileReader.Factory();
+        TestingHsSubpartitionFileReader.Factory factory2 =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean readBuffersFinished1 = new AtomicBoolean(false);
+        AtomicBoolean readBuffersFinished2 = new AtomicBoolean(false);
+        factory1.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished2).isFalse();
+                    readBuffersFinished1.set(true);
+                });
+        factory2.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    assertThat(readBuffersFinished1).isTrue();
+                    readBuffersFinished2.set(true);
+                });
+
+        factory1.setPrioritySupplier(() -> 1);
+        factory2.setPrioritySupplier(() -> 2);
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory1);
+        readScheduler.registerNewSubpartition(1, subpartitionViewOperation, 
factory2);
+
+        // trigger run.
+        ioExecutor.trigger();
+
+        assertThat(readBuffersFinished2).isTrue();
+    }
+
+    @Test
+    void testRunRequestBufferTimeout() throws Exception {
+        Duration bufferRequestTimeout = Duration.ofSeconds(3);
+
+        // request all buffer first.
+        bufferPool.requestBuffers();
+        assertThat(bufferPool.getAvailableBuffers()).isZero();
+
+        readScheduler =
+                new HsResultPartitionReadScheduler(
+                        bufferPool,
+                        ioExecutor,
+                        new HsFileDataIndexImpl(NUM_SUBPARTITIONS),
+                        dataFilePath,
+                        HybridShuffleConfiguration.createConfiguration(
+                                bufferPool.getNumBuffersPerRequest(),
+                                NUM_SUBPARTITIONS,
+                                bufferRequestTimeout));
+
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicBoolean prepareForSchedulingFinished = new AtomicBoolean(false);
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setPrepareForSchedulingRunnable(() -> 
prepareForSchedulingFinished.set(true));
+        factory.setFailConsumer((cause::set));
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(prepareForSchedulingFinished).isTrue();
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(TimeoutException.class)
+                                        .hasMessageContaining("Buffer request 
timeout"));
+    }
+
+    /**
+     * When {@link SubpartitionFileReader#readBuffers(Queue, BufferRecycler)} 
throw IOException,
+     * subpartition reader should fail.
+     */
+    @Test
+    void testRunReadBuffersThrowException() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        AtomicReference<Throwable> cause = new AtomicReference<>();
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer(
+                (buffers, recycle) -> {
+                    throw new IOException("expected exception.");
+                });
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        .isInstanceOf(IOException.class)
+                                        .hasMessageContaining("expected 
exception."));
+    }
+
+    // ----------------------- test release 
---------------------------------------
+
+    /** Test scheduler release when reader is reading buffers. */
+    @Test
+    void testReleasedWhenReading() throws Exception {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+
+        AtomicReference<Throwable> cause = new AtomicReference<>(null);
+        factory.setFailConsumer((cause::set));
+        factory.setReadBuffersConsumer((buffers, recycle) -> 
readScheduler.release());
+
+        readScheduler.registerNewSubpartition(0, subpartitionViewOperation, 
factory);
+
+        ioExecutor.trigger();
+
+        assertThat(cause)
+                .hasValueSatisfying(
+                        throwable ->
+                                assertThat(throwable)
+                                        
.isInstanceOf(IllegalStateException.class)
+                                        .hasMessageContaining(
+                                                "Result partition has been 
already released."));
+
+        assertThat(ioExecutor.numQueuedRunnables()).isZero();
+    }
+
+    /** Test scheduler was released, but receive new subpartition reader 
registration. */
+    @Test
+    void testRegisterSubpartitionReaderAfterSchedulerReleased() {
+        TestingHsSubpartitionFileReader.Factory factory =
+                new TestingHsSubpartitionFileReader.Factory();
+        readScheduler.release();
+        assertThatThrownBy(
+                        () -> {
+                            readScheduler.registerNewSubpartition(
+                                    0, subpartitionViewOperation, factory);
+                            ioExecutor.trigger();
+                        })
+                .isInstanceOf(IllegalStateException.class)
+                .hasMessageContaining("HsResultPartitionReadScheduler is 
already released.");
+    }
+
+    private static FileChannel openFileChannel(Path path) throws IOException {
+        return FileChannel.open(path, StandardOpenOption.READ);
+    }
+
+    private static class TestingHsSubpartitionFileReader implements 
SubpartitionFileReader {
+        private final Runnable prepareForSchedulingRunnable;
+
+        private final BiConsumerWithException<Queue<MemorySegment>, 
BufferRecycler, IOException>
+                readBuffersConsumer;
+
+        private final Consumer<Throwable> failConsumer;
+
+        private final Supplier<Integer> prioritySupplier;
+
+        public TestingHsSubpartitionFileReader(
+                Runnable prepareForSchedulingRunnable,
+                BiConsumerWithException<Queue<MemorySegment>, BufferRecycler, 
IOException>
+                        readBuffersConsumer,
+                Consumer<Throwable> failConsumer,
+                Supplier<Integer> prioritySupplier) {
+            this.prepareForSchedulingRunnable = prepareForSchedulingRunnable;
+            this.readBuffersConsumer = readBuffersConsumer;
+            this.failConsumer = failConsumer;
+            this.prioritySupplier = prioritySupplier;
+        }
+
+        @Override
+        public void prepareForScheduling() {
+            prepareForSchedulingRunnable.run();
+        }
+
+        @Override
+        public void readBuffers(Queue<MemorySegment> buffers, BufferRecycler 
recycler)
+                throws IOException {
+            readBuffersConsumer.accept(buffers, recycler);
+        }
+
+        @Override
+        public void fail(Throwable failureCause) {
+            failConsumer.accept(failureCause);
+        }
+
+        @Override
+        public int compareTo(SubpartitionFileReader that) {
+            checkArgument(that instanceof TestingHsSubpartitionFileReader);
+            return Integer.compare(
+                    prioritySupplier.get(),
+                    ((TestingHsSubpartitionFileReader) 
that).prioritySupplier.get());
+        }
+
+        public static class Factory implements SubpartitionFileReader.Factory {

Review Comment:
   good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to