This is an automated email from the ASF dual-hosted git repository.
zuston pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new 3590940eb [#1658] improvement: FileSegmentManagedBuffer#nioByteBuffer
read result cacheable (#1659)
3590940eb is described below
commit 3590940ebe8feb0dae30b120116ccfb925e8efbe
Author: xumanbu <[email protected]>
AuthorDate: Mon Jan 27 09:25:33 2025 +0800
[#1658] improvement: FileSegmentManagedBuffer#nioByteBuffer read result
cacheable (#1659)
### What changes were proposed in this pull request?
FileSegmentManagedBuffer#nioByteBuffer add a buffer to hold read file
result to avoid multiple read file
### Why are the changes needed?
Fix: #1658
### Does this PR introduce _any_ user-facing change?
No.
### How was this patch tested?
existed UT
---
.../netty/buffer/FileSegmentManagedBuffer.java | 21 +++++---
.../netty/buffer/FileSegmentManagedBufferTest.java | 58 ++++++++++++++++++++++
2 files changed, 73 insertions(+), 6 deletions(-)
diff --git
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
index c95e91ae7..16db7c792 100644
---
a/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
+++
b/common/src/main/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBuffer.java
@@ -39,6 +39,8 @@ public class FileSegmentManagedBuffer extends ManagedBuffer {
private final File file;
private final long offset;
private final int length;
+ private volatile boolean isFilled;
+ private ByteBuffer cachedBuffer;
public FileSegmentManagedBuffer(File file, long offset, int length) {
this.file = file;
@@ -58,21 +60,25 @@ public class FileSegmentManagedBuffer extends ManagedBuffer
{
@Override
public ByteBuffer nioByteBuffer() {
+ if (isFilled) {
+ return cachedBuffer;
+ }
FileChannel channel = null;
try {
channel = new RandomAccessFile(file, "r").getChannel();
- ByteBuffer buf = ByteBuffer.allocate(length);
+ cachedBuffer = ByteBuffer.allocate(length);
channel.position(offset);
- while (buf.remaining() != 0) {
- if (channel.read(buf) == -1) {
+ while (cachedBuffer.remaining() != 0) {
+ if (channel.read(cachedBuffer) == -1) {
throw new IOException(
String.format(
"Reached EOF before filling
buffer.offset=%s,file=%s,buf.remaining=%s",
- offset, file.getAbsoluteFile(), buf.remaining()));
+ offset, file.getAbsoluteFile(), cachedBuffer.remaining()));
}
}
- buf.flip();
- return buf;
+ cachedBuffer.flip();
+ isFilled = true;
+ return cachedBuffer;
} catch (IOException e) {
String fileName = file.getAbsolutePath();
String errorMessage =
@@ -102,6 +108,9 @@ public class FileSegmentManagedBuffer extends ManagedBuffer
{
@Override
public ManagedBuffer release() {
+ cachedBuffer.clear();
+ cachedBuffer = null;
+ isFilled = false;
return this;
}
diff --git
a/common/src/test/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBufferTest.java
b/common/src/test/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBufferTest.java
new file mode 100644
index 000000000..c8719ef2a
--- /dev/null
+++
b/common/src/test/java/org/apache/uniffle/common/netty/buffer/FileSegmentManagedBufferTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.netty.buffer;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.nio.ByteBuffer;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+public class FileSegmentManagedBufferTest {
+ @Test
+ void testNioByteBuffer(@TempDir File tmpDir) {
+ File dataFile = new File(tmpDir, "data_file_1");
+ String str = "Hello";
+ byte[] strToBytes = str.getBytes();
+ try (FileOutputStream outputStream = new FileOutputStream(dataFile)) {
+ outputStream.write(strToBytes);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ FileSegmentManagedBuffer fileSegmentManagedBuffer =
+ new FileSegmentManagedBuffer(dataFile, 0, strToBytes.length);
+ ByteBuffer byteBuffer1 = fileSegmentManagedBuffer.nioByteBuffer();
+ assertEquals(new String(byteBuffer1.array()), str);
+
+ ByteBuffer byteBuffer2 = fileSegmentManagedBuffer.nioByteBuffer();
+ assertTrue(byteBuffer1 == byteBuffer2);
+ fileSegmentManagedBuffer.release();
+
+ fileSegmentManagedBuffer = new FileSegmentManagedBuffer(dataFile, 0,
strToBytes.length);
+ ByteBuffer byteBuffer3 = fileSegmentManagedBuffer.nioByteBuffer();
+ assertFalse(byteBuffer3 == byteBuffer2);
+ assertFalse(byteBuffer3 == byteBuffer1);
+
+ fileSegmentManagedBuffer.release();
+ }
+}