reswqa commented on code in PR #19960:
URL: https://github.com/apache/flink/pull/19960#discussion_r915677727


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/hybrid/HsSubpartitionFileReaderTest.java:
##########
@@ -0,0 +1,418 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition.hybrid;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.io.network.buffer.Buffer;
+import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
+import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
+import org.apache.flink.runtime.io.network.partition.BufferReaderWriterUtil;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsFileDataIndex.SpilledBuffer;
+import 
org.apache.flink.runtime.io.network.partition.hybrid.HsSubpartitionFileReader.BufferIndexOrError;
+import org.apache.flink.util.IOUtils;
+import org.apache.flink.util.TestLoggerExtension;
+
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.nio.channels.FileChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Deque;
+import java.util.List;
+import java.util.Queue;
+import java.util.Random;
+import java.util.UUID;
+import java.util.concurrent.BlockingDeque;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/** Tests for {@link HsSubpartitionFileReader}. */
+@ExtendWith(TestLoggerExtension.class)
+class HsSubpartitionFileReaderTest {
+    private static final int bufferSize = Integer.BYTES;
+
+    private static final int targetChannel = 0;
+
+    private static final int MAX_BUFFERS_READ_AHEAD = 5;
+
+    private Random random;
+
+    private HsFileDataIndex diskIndex;
+
+    private TestingSubpartitionViewInternalOperation subpartitionOperation;
+
+    private FileChannel dataFileChannel;
+
+    private long currentFileOffset;
+
+    @BeforeEach
+    void before(@TempDir Path tempPath) throws Exception {
+        random = new Random();
+        Path dataFilePath = 
Files.createFile(tempPath.resolve(UUID.randomUUID().toString()));
+        dataFileChannel = openFileChannel(dataFilePath);
+        diskIndex = new HsFileDataIndexImpl(1);
+        subpartitionOperation = new TestingSubpartitionViewInternalOperation();
+        currentFileOffset = 0L;
+    }
+
+    @AfterEach
+    void after() {
+        IOUtils.closeQuietly(dataFileChannel);
+    }
+
+    @Test
+    void testReadBuffer() throws Exception {
+        diskIndex = new HsFileDataIndexImpl(2);
+        TestingSubpartitionViewInternalOperation viewNotifier1 =
+                new TestingSubpartitionViewInternalOperation();
+        TestingSubpartitionViewInternalOperation viewNotifier2 =
+                new TestingSubpartitionViewInternalOperation();
+        HsSubpartitionFileReader fileReader1 = createSubpartitionFileReader(0, 
viewNotifier1);
+        HsSubpartitionFileReader fileReader2 = createSubpartitionFileReader(1, 
viewNotifier2);
+
+        writeDataToFile(0, 0, 10, 2);
+        writeDataToFile(1, 0, 20, 2);
+
+        writeDataToFile(0, 2, 15, 1);
+        writeDataToFile(1, 2, 25, 1);
+
+        Queue<MemorySegment> memorySegments = createsMemorySegments(6);
+
+        fileReader1.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+        assertThat(memorySegments).hasSize(4);
+        checkData(fileReader1, 10, 11);
+
+        fileReader2.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+        assertThat(memorySegments).hasSize(2);
+        checkData(fileReader2, 20, 21);
+
+        fileReader1.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+        assertThat(memorySegments).hasSize(1);
+        checkData(fileReader1, 15);
+
+        fileReader2.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+        assertThat(memorySegments).isEmpty();
+        checkData(fileReader2, 25);
+    }
+
+    @Test
+    void testReadEmptyRegion() throws Exception {
+        HsSubpartitionFileReader subpartitionFileReader = 
createSubpartitionFileReader();
+        Deque<BufferIndexOrError> loadedBuffers = 
subpartitionFileReader.getLoadedBuffers();
+        Queue<MemorySegment> memorySegments = createsMemorySegments(2);
+        subpartitionFileReader.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+
+        assertThat(memorySegments).hasSize(2);
+        assertThat(loadedBuffers).isEmpty();
+    }
+
+    /**
+     * If target buffer is not the first buffer in the region, file reader 
will skip the buffers not
+     * needed.
+     */
+    @Test
+    void testReadBufferSkip() throws Exception {
+        HsSubpartitionFileReader subpartitionFileReader = 
createSubpartitionFileReader();
+        Deque<BufferIndexOrError> loadedBuffers = 
subpartitionFileReader.getLoadedBuffers();
+        // write buffer with index: 0, 1, 2, 3, 4, 5
+        writeDataToFile(targetChannel, 0, 6);
+
+        subpartitionOperation.advanceConsumptionProgress();
+        subpartitionOperation.advanceConsumptionProgress();
+        assertThat(subpartitionOperation.getConsumingOffset()).isEqualTo(1);
+        // update consumptionProgress
+        subpartitionFileReader.prepareForScheduling();
+        // read buffer, expected buffer with index: 2
+        Queue<MemorySegment> segments = createsMemorySegments(1);
+        subpartitionFileReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
+
+        assertThat(segments).isEmpty();
+        assertThat(loadedBuffers).hasSize(1);
+
+        BufferIndexOrError bufferIndexOrError = loadedBuffers.poll();
+        assertThat(bufferIndexOrError.getBuffer()).isPresent();
+        assertThat(bufferIndexOrError.getThrowable()).isNotPresent();
+        assertThat(bufferIndexOrError.getIndex()).isEqualTo(2);
+
+        subpartitionOperation.advanceConsumptionProgress();
+        subpartitionOperation.advanceConsumptionProgress();
+        subpartitionFileReader.prepareForScheduling();
+        segments = createsMemorySegments(1);
+        // trigger next round read, cached region will not update, but 
numSkip, numReadable and
+        // currentBufferIndex should be updated.
+        subpartitionFileReader.readBuffers(segments, 
FreeingBufferRecycler.INSTANCE);
+        assertThat(segments).isEmpty();
+        assertThat(loadedBuffers).hasSize(1);
+
+        bufferIndexOrError = loadedBuffers.poll();
+        assertThat(bufferIndexOrError.getBuffer()).isPresent();
+        assertThat(bufferIndexOrError.getThrowable()).isNotPresent();
+        assertThat(bufferIndexOrError.getIndex()).isEqualTo(4);
+    }
+
+    @Test
+    void testReadBufferNotExceedThreshold() throws Exception {
+        HsSubpartitionFileReader subpartitionFileReader = 
createSubpartitionFileReader();
+        Deque<BufferIndexOrError> loadedBuffers = 
subpartitionFileReader.getLoadedBuffers();
+
+        writeDataToFile(targetChannel, 0, MAX_BUFFERS_READ_AHEAD + 1);
+
+        subpartitionFileReader.prepareForScheduling();
+        // allocate maxBuffersReadAhead + 1 read buffers for reading
+        Queue<MemorySegment> memorySegments = 
createsMemorySegments(MAX_BUFFERS_READ_AHEAD + 1);
+        // trigger reading
+        subpartitionFileReader.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+
+        // preload by fileReader will not exceed threshold
+        assertThat(loadedBuffers).hasSize(MAX_BUFFERS_READ_AHEAD);
+        assertThat(memorySegments).hasSize(1);
+    }
+
+    @Test
+    void testReadBufferNotifyDataAvailable() throws Exception {
+        HsSubpartitionFileReader subpartitionFileReader = 
createSubpartitionFileReader();
+        BlockingDeque<BufferIndexOrError> loadedBuffers =
+                (BlockingDeque<BufferIndexOrError>) 
subpartitionFileReader.getLoadedBuffers();
+
+        writeDataToFile(targetChannel, 0, 1);
+        subpartitionFileReader.prepareForScheduling();
+        Queue<MemorySegment> memorySegments = createsMemorySegments(1);
+        subpartitionFileReader.readBuffers(memorySegments, 
FreeingBufferRecycler.INSTANCE);
+        assertThat(subpartitionOperation.getNotifyNum()).isEqualTo(1);
+        assertThat(loadedBuffers).hasSize(1);
+
+        // trigger next round reading.
+        final int numBuffers = MAX_BUFFERS_READ_AHEAD;
+        subpartitionOperation.advanceConsumptionProgress();
+        writeDataToFile(targetChannel, 1, numBuffers);
+        subpartitionFileReader.prepareForScheduling();
+
+        OneShotLatch consumerThreadLatch = new OneShotLatch();
+        AtomicReference<Throwable> cause = new AtomicReference<>(null);
+        new Thread(
+                        () -> {
+                            consumerThreadLatch.trigger();
+                            for (int i = 0; i < numBuffers; i++) {
+                                try {
+                                    loadedBuffers.take();
+                                } catch (InterruptedException e) {
+                                    cause.set(e);
+                                }
+                            }
+                        })
+                .start();
+        consumerThreadLatch.await();

Review Comment:
   This part is refactored to test that the consumer thread can consume all 
buffers in time, this also shows that the notify logic is correct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to