This is an automated email from the ASF dual-hosted git repository.
szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d9eb6b110e HDDS-7117. Consider reading chunk files using
MappedByteBuffer. (#3674)
d9eb6b110e is described below
commit d9eb6b110e2c645309c8900367f44dfeeacb2d55
Author: Tsz-Wo Nicholas Sze <[email protected]>
AuthorDate: Wed Dec 6 07:36:49 2023 -0800
HDDS-7117. Consider reading chunk files using MappedByteBuffer. (#3674)
---
.../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 6 +-
.../apache/hadoop/ozone/common/ChunkBuffer.java | 9 +-
.../common/ChunkBufferImplWithByteBufferList.java | 4 +-
.../common/src/main/resources/ozone-default.xml | 10 +-
.../TestChunkBufferImplWithByteBufferList.java | 2 +-
.../hadoop/hdds/conf/ConfigurationSource.java | 11 ++
.../container/keyvalue/helpers/ChunkUtils.java | 68 +++++++++----
.../container/keyvalue/impl/BlockManagerImpl.java | 23 ++---
.../keyvalue/impl/FilePerBlockStrategy.java | 8 +-
.../keyvalue/impl/FilePerChunkStrategy.java | 14 +--
.../keyvalue/interfaces/BlockManager.java | 3 +
.../container/keyvalue/helpers/TestChunkUtils.java | 112 ++++++++++++++++++---
12 files changed, 204 insertions(+), 66 deletions(-)
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
index 1eb15b2848..7e01afd559 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/ScmConfigKeys.java
@@ -139,7 +139,11 @@ public final class ScmConfigKeys {
public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY =
"ozone.chunk.read.buffer.default.size";
public static final String OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT =
- "64KB";
+ "1MB";
+ public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY =
+ "ozone.chunk.read.mapped.buffer.threshold";
+ public static final String OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT =
+ "32KB";
public static final String OZONE_SCM_CONTAINER_LAYOUT_KEY =
"ozone.scm.container.layout";
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
index 7d069cddc6..3948b5f04f 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBuffer.java
@@ -21,6 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.List;
+import java.util.Objects;
import java.util.function.Function;
import java.util.function.Supplier;
@@ -58,6 +59,10 @@ public interface ChunkBuffer {
/** Wrap the given list of {@link ByteBuffer}s as a {@link ChunkBuffer}. */
static ChunkBuffer wrap(List<ByteBuffer> buffers) {
+ Objects.requireNonNull(buffers, "buffers == null");
+ if (buffers.size() == 1) {
+ return wrap(buffers.get(0));
+ }
return new ChunkBufferImplWithByteBufferList(buffers);
}
@@ -91,8 +96,7 @@ public interface ChunkBuffer {
/** Similar to {@link ByteBuffer#put(byte[])}. */
default ChunkBuffer put(byte b) {
- byte[] buf = new byte[1];
- buf[0] = (byte) b;
+ final byte[] buf = {b};
return put(buf, 0, 1);
}
@@ -116,7 +120,6 @@ public interface ChunkBuffer {
/**
* Iterate the buffer from the current position to the current limit.
- *
* Upon the iteration complete,
* the buffer's position will be equal to its limit.
*
diff --git
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
index f6a7f60b0a..7c3a0c7d2d 100644
---
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
+++
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/common/ChunkBufferImplWithByteBufferList.java
@@ -31,6 +31,7 @@ import java.nio.ByteBuffer;
import java.nio.channels.GatheringByteChannel;
import java.util.ArrayList;
import java.util.List;
+import java.util.Objects;
import java.util.function.Function;
/**
@@ -50,8 +51,7 @@ public class ChunkBufferImplWithByteBufferList implements
ChunkBuffer {
private int currentIndex;
ChunkBufferImplWithByteBufferList(List<ByteBuffer> buffers) {
- Preconditions.checkArgument(buffers != null, "buffer == null");
-
+ Objects.requireNonNull(buffers, "buffers == null");
this.buffers = !buffers.isEmpty() ? ImmutableList.copyOf(buffers) :
EMPTY_BUFFER;
this.limit = buffers.stream().mapToInt(ByteBuffer::limit).sum();
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index bf9a2f511b..db16381634 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -836,7 +836,7 @@
</property>
<property>
<name>ozone.chunk.read.buffer.default.size</name>
- <value>64KB</value>
+ <value>1MB</value>
<tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
<description>
The default read buffer size during read chunk operations when checksum
@@ -847,6 +847,14 @@
(ozone.client.bytes.per.checksum) corresponding to the chunk.
</description>
</property>
+ <property>
+ <name>ozone.chunk.read.mapped.buffer.threshold</name>
+ <value>32KB</value>
+ <tag>OZONE, SCM, CONTAINER, PERFORMANCE</tag>
+ <description>
+ The default read threshold to use memory mapped buffers.
+ </description>
+ </property>
<property>
<name>ozone.scm.container.layout</name>
<value>FILE_PER_BLOCK</value>
diff --git
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
index 3da43166e7..072c07be64 100644
---
a/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
+++
b/hadoop-hdds/common/src/test/java/org/apache/hadoop/ozone/common/TestChunkBufferImplWithByteBufferList.java
@@ -36,7 +36,7 @@ public class TestChunkBufferImplWithByteBufferList {
@Test
public void rejectsNullList() {
List<ByteBuffer> list = null;
- assertThrows(IllegalArgumentException.class, () -> ChunkBuffer.wrap(list));
+ assertThrows(NullPointerException.class, () -> ChunkBuffer.wrap(list));
}
@Test
diff --git
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
index dae095a193..b1a20c9aec 100644
---
a/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
+++
b/hadoop-hdds/config/src/main/java/org/apache/hadoop/hdds/conf/ConfigurationSource.java
@@ -285,6 +285,17 @@ public interface ConfigurationSource {
}
}
+ default int getBufferSize(String name, String defaultValue) {
+ final double size = getStorageSize(name, defaultValue, StorageUnit.BYTES);
+ if (size <= 0) {
+ throw new IllegalArgumentException(name + " <= 0");
+ } else if (size > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException(
+ name + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
+ }
+ return (int) size;
+ }
+
default double getStorageSize(String name, String defaultValue,
StorageUnit targetUnit) {
String vString = get(name);
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
index 85f3e21422..7266904139 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/helpers/ChunkUtils.java
@@ -24,6 +24,7 @@ import java.io.IOException;
import java.io.InterruptedIOException;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.NoSuchFileException;
@@ -32,8 +33,11 @@ import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.security.NoSuchAlgorithmException;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;
@@ -61,7 +65,7 @@ import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Res
import static
org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.Result.UNSUPPORTED_REQUEST;
import static
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
-import org.apache.ratis.util.function.CheckedConsumer;
+import org.apache.ratis.util.function.CheckedFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -184,26 +188,24 @@ public final class ChunkUtils {
}
public static ChunkBuffer readData(long len, int bufferCapacity,
- CheckedConsumer<ByteBuffer[], StorageContainerException> readMethod)
+ File file, long off, HddsVolume volume, int readMappedBufferThreshold)
throws StorageContainerException {
+ if (len > readMappedBufferThreshold) {
+ return readData(file, bufferCapacity, off, len, volume);
+ } else if (len == 0) {
+ return ChunkBuffer.wrap(Collections.emptyList());
+ }
+
final ByteBuffer[] buffers = BufferUtils.assignByteBuffers(len,
bufferCapacity);
- readMethod.accept(buffers);
+ readData(file, off, len, c -> c.position(off).read(buffers), volume);
+ Arrays.stream(buffers).forEach(ByteBuffer::flip);
return ChunkBuffer.wrap(Arrays.asList(buffers));
}
- /**
- * Reads data from an existing chunk file into a list of ByteBuffers.
- *
- * @param file file where data lives
- * @param buffers
- * @param offset
- * @param len
- * @param volume for statistics and checker
- */
- public static void readData(File file, ByteBuffer[] buffers,
- long offset, long len, HddsVolume volume)
- throws StorageContainerException {
+ private static void readData(File file, long offset, long len,
+ CheckedFunction<FileChannel, Long, IOException> readMethod,
+ HddsVolume volume) throws StorageContainerException {
final Path path = file.toPath();
final long startTime = Time.monotonicNow();
@@ -213,8 +215,7 @@ public final class ChunkUtils {
bytesRead = processFileExclusively(path, () -> {
try (FileChannel channel = open(path, READ_OPTIONS, NO_ATTRIBUTES);
FileLock ignored = channel.lock(offset, len, true)) {
-
- return channel.position(offset).read(buffers);
+ return readMethod.apply(channel);
} catch (IOException e) {
onFailure(volume);
throw new UncheckedIOException(e);
@@ -239,10 +240,37 @@ public final class ChunkUtils {
bytesRead, offset, file);
validateReadSize(len, bytesRead);
+ }
- for (ByteBuffer buf : buffers) {
- buf.flip();
- }
+ /**
+ * Read data from the given file using
+ * {@link FileChannel#map(FileChannel.MapMode, long, long)},
+ * whose javadoc recommends that it is generally only worth mapping
+ * relatively large files (larger than a few tens of kilobytes)
+ * into memory from the standpoint of performance.
+ *
+ * @return a list of {@link MappedByteBuffer} containing the data.
+ */
+ private static ChunkBuffer readData(File file, int chunkSize,
+ long offset, long length, HddsVolume volume)
+ throws StorageContainerException {
+
+ final List<ByteBuffer> buffers = new ArrayList<>(
+ Math.toIntExact((length - 1) / chunkSize) + 1);
+ readData(file, offset, length, channel -> {
+ long readLen = 0;
+ while (readLen < length) {
+ final int n = Math.toIntExact(Math.min(length - readLen, chunkSize));
+ final ByteBuffer mapped = channel.map(
+ FileChannel.MapMode.READ_ONLY, offset + readLen, n);
+ LOG.debug("mapped: offset={}, readLen={}, n={}, {}",
+ offset, readLen, n, mapped.getClass());
+ readLen += mapped.remaining();
+ buffers.add(mapped);
+ }
+ return readLen;
+ }, volume);
+ return ChunkBuffer.wrap(buffers);
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
index 449fe46ae0..62896561f2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/BlockManagerImpl.java
@@ -24,7 +24,6 @@ import java.util.List;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.hdds.utils.db.BatchOperation;
@@ -60,6 +59,7 @@ public class BlockManagerImpl implements BlockManager {
// Default Read Buffer capacity when Checksum is not present
private final int defaultReadBufferCapacity;
+ private final int readMappedBufferThreshold;
/**
* Constructs a Block Manager.
@@ -69,19 +69,12 @@ public class BlockManagerImpl implements BlockManager {
public BlockManagerImpl(ConfigurationSource conf) {
Preconditions.checkNotNull(conf, "Config cannot be null");
this.config = conf;
- final double size = config.getStorageSize(
+ this.defaultReadBufferCapacity = config.getBufferSize(
ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY,
- ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT,
- StorageUnit.BYTES);
- if (size <= 0) {
- throw new IllegalArgumentException(
- ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY + " <= 0");
- } else if (size > Integer.MAX_VALUE) {
- throw new IllegalArgumentException(
- ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_KEY
- + " > Integer.MAX_VALUE = " + Integer.MAX_VALUE);
- }
- this.defaultReadBufferCapacity = (int) size;
+ ScmConfigKeys.OZONE_CHUNK_READ_BUFFER_DEFAULT_SIZE_DEFAULT);
+ this.readMappedBufferThreshold = config.getBufferSize(
+ ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_KEY,
+ ScmConfigKeys.OZONE_CHUNK_READ_MAPPED_BUFFER_THRESHOLD_DEFAULT);
}
@Override
@@ -267,6 +260,10 @@ public class BlockManagerImpl implements BlockManager {
return defaultReadBufferCapacity;
}
+ public int getReadMappedBufferThreshold() {
+ return readMappedBufferThreshold;
+ }
+
/**
* Deletes an existing block.
* As Deletion is handled by BlockDeletingService,
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
index ccab7f35e8..040b03c3dc 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerBlockStrategy.java
@@ -60,7 +60,6 @@ import static
org.apache.hadoop.ozone.container.common.impl.ContainerLayoutVersi
import static
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext.WriteChunkStage.COMMIT_DATA;
import static
org.apache.hadoop.ozone.container.common.utils.StorageVolumeUtil.onFailure;
import static
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.limitReadSize;
-import static
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.readData;
import static
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.validateChunkForOverwrite;
import static
org.apache.hadoop.ozone.container.keyvalue.helpers.ChunkUtils.verifyChunkFileExists;
@@ -75,6 +74,7 @@ public class FilePerBlockStrategy implements ChunkManager {
private final boolean doSyncWrite;
private final OpenFiles files = new OpenFiles();
private final int defaultReadBufferCapacity;
+ private final int readMappedBufferThreshold;
private final VolumeSet volumeSet;
public FilePerBlockStrategy(boolean sync, BlockManager manager,
@@ -82,6 +82,8 @@ public class FilePerBlockStrategy implements ChunkManager {
doSyncWrite = sync;
this.defaultReadBufferCapacity = manager == null ? 0 :
manager.getDefaultReadBufferCapacity();
+ this.readMappedBufferThreshold = manager == null ? 0
+ : manager.getReadMappedBufferThreshold();
this.volumeSet = volSet;
}
@@ -192,8 +194,8 @@ public class FilePerBlockStrategy implements ChunkManager {
long offset = info.getOffset();
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
defaultReadBufferCapacity);
- return readData(len, bufferCapacity,
- array -> readData(chunkFile, array, offset, len, volume));
+ return ChunkUtils.readData(len, bufferCapacity, chunkFile, offset, volume,
+ readMappedBufferThreshold);
}
@Override
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
index 13aa9c50f7..31a340f310 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/impl/FilePerChunkStrategy.java
@@ -20,14 +20,12 @@ package org.apache.hadoop.ozone.container.keyvalue.impl;
import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdds.client.BlockID;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import
org.apache.hadoop.ozone.container.common.transport.server.ratis.DispatcherContext;
@@ -48,7 +46,6 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.StandardCopyOption;
import java.util.ArrayList;
@@ -69,6 +66,7 @@ public class FilePerChunkStrategy implements ChunkManager {
private final boolean doSyncWrite;
private final BlockManager blockManager;
private final int defaultReadBufferCapacity;
+ private final int readMappedBufferThreshold;
private final VolumeSet volumeSet;
public FilePerChunkStrategy(boolean sync, BlockManager manager,
@@ -77,6 +75,8 @@ public class FilePerChunkStrategy implements ChunkManager {
blockManager = manager;
this.defaultReadBufferCapacity = manager == null ? 0 :
manager.getDefaultReadBufferCapacity();
+ this.readMappedBufferThreshold = manager == null ? 0
+ : manager.getReadMappedBufferThreshold();
this.volumeSet = volSet;
}
@@ -233,9 +233,6 @@ public class FilePerChunkStrategy implements ChunkManager {
int bufferCapacity = ChunkManager.getBufferCapacityForChunkRead(info,
defaultReadBufferCapacity);
- ByteBuffer[] dataBuffers = BufferUtils.assignByteBuffers(len,
- bufferCapacity);
-
long chunkFileOffset = 0;
if (info.getOffset() != 0) {
try {
@@ -267,8 +264,8 @@ public class FilePerChunkStrategy implements ChunkManager {
if (file.exists()) {
long offset = info.getOffset() - chunkFileOffset;
Preconditions.checkState(offset >= 0);
- ChunkUtils.readData(file, dataBuffers, offset, len, volume);
- return ChunkBuffer.wrap(Lists.newArrayList(dataBuffers));
+ return ChunkUtils.readData(len, bufferCapacity, file, offset, volume,
+ readMappedBufferThreshold);
}
} catch (StorageContainerException ex) {
//UNABLE TO FIND chunk is not a problem as we will try with the
@@ -276,7 +273,6 @@ public class FilePerChunkStrategy implements ChunkManager {
if (ex.getResult() != UNABLE_TO_FIND_CHUNK) {
throw ex;
}
- BufferUtils.clearBuffers(dataBuffers);
}
}
throw new StorageContainerException(
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
index aa7285a232..02b7e93d50 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/interfaces/BlockManager.java
@@ -93,6 +93,9 @@ public interface BlockManager {
int getDefaultReadBufferCapacity();
+ /** @return the threshold to read using memory mapped buffers. */
+ int getReadMappedBufferThreshold();
+
/**
* Shutdown ContainerManager.
*/
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
index 554265688d..037de863c0 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/keyvalue/helpers/TestChunkUtils.java
@@ -17,9 +17,12 @@
*/
package org.apache.hadoop.ozone.container.keyvalue.helpers;
+import java.io.BufferedOutputStream;
import java.io.File;
import java.io.IOException;
+import java.io.OutputStream;
import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Files;
import java.nio.file.Path;
@@ -27,6 +30,7 @@ import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
+import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
@@ -36,7 +40,6 @@ import java.util.concurrent.atomic.AtomicInteger;
import
org.apache.hadoop.hdds.scm.container.common.helpers.StorageContainerException;
import org.apache.hadoop.ozone.common.ChunkBuffer;
-import org.apache.hadoop.ozone.common.utils.BufferUtils;
import org.apache.hadoop.ozone.container.common.helpers.ChunkInfo;
import org.apache.ozone.test.GenericTestUtils;
@@ -64,6 +67,16 @@ public class TestChunkUtils {
LoggerFactory.getLogger(TestChunkUtils.class);
private static final String PREFIX = TestChunkUtils.class.getSimpleName();
+ private static final int BUFFER_CAPACITY = 1 << 20;
+ private static final int MAPPED_BUFFER_THRESHOLD = 32 << 10;
+ private static final Random RANDOM = new Random();
+
+ static ChunkBuffer readData(File file, long off, long len)
+ throws StorageContainerException {
+ LOG.info("off={}, len={}", off, len);
+ return ChunkUtils.readData(len, BUFFER_CAPACITY, file, off, null,
+ MAPPED_BUFFER_THRESHOLD);
+ }
@Test
public void concurrentReadOfSameFile() throws Exception {
@@ -85,12 +98,11 @@ public class TestChunkUtils {
final int threadNumber = i;
executor.execute(() -> {
try {
- ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len);
- ChunkUtils.readData(file, readBuffers, offset, len, null);
-
+ final ChunkBuffer chunk = readData(file, offset, len);
// There should be only one element in readBuffers
- Assertions.assertEquals(1, readBuffers.length);
- ByteBuffer readBuffer = readBuffers[0];
+ final List<ByteBuffer> buffers = chunk.asByteBufferList();
+ Assertions.assertEquals(1, buffers.size());
+ final ByteBuffer readBuffer = buffers.get(0);
LOG.info("Read data ({}): {}", threadNumber,
new String(readBuffer.array(), UTF_8));
@@ -172,12 +184,11 @@ public class TestChunkUtils {
int offset = 0;
ChunkUtils.writeData(file, data, offset, len, null, true);
- ByteBuffer[] readBuffers = BufferUtils.assignByteBuffers(len, len);
- ChunkUtils.readData(file, readBuffers, offset, len, null);
-
+ final ChunkBuffer chunk = readData(file, offset, len);
// There should be only one element in readBuffers
- Assertions.assertEquals(1, readBuffers.length);
- ByteBuffer readBuffer = readBuffers[0];
+ final List<ByteBuffer> buffers = chunk.asByteBufferList();
+ Assertions.assertEquals(1, buffers.size());
+ final ByteBuffer readBuffer = buffers.get(0);
assertArrayEquals(array, readBuffer.array());
assertEquals(len, readBuffer.remaining());
@@ -220,15 +231,90 @@ public class TestChunkUtils {
int len = 123;
int offset = 0;
File nonExistentFile = new File("nosuchfile");
- ByteBuffer[] bufs = BufferUtils.assignByteBuffers(len, len);
// when
StorageContainerException e = assertThrows(
StorageContainerException.class,
- () -> ChunkUtils.readData(nonExistentFile, bufs, offset, len, null));
+ () -> readData(nonExistentFile, offset, len));
// then
Assertions.assertEquals(UNABLE_TO_FIND_CHUNK, e.getResult());
}
+ @Test
+ public void testReadData() throws Exception {
+ final File dir = GenericTestUtils.getTestDir("testReadData");
+ try {
+ Assertions.assertTrue(dir.mkdirs());
+
+ // large file
+ final int large = 10 << 20; // 10MB
+ Assertions.assertTrue(large > MAPPED_BUFFER_THRESHOLD);
+ runTestReadFile(large, dir, true);
+
+ // small file
+ final int small = 30 << 10; // 30KB
+ Assertions.assertTrue(small <= MAPPED_BUFFER_THRESHOLD);
+ runTestReadFile(small, dir, false);
+
+ // boundary case
+ runTestReadFile(MAPPED_BUFFER_THRESHOLD, dir, false);
+
+ // empty file
+ runTestReadFile(0, dir, false);
+
+ for (int i = 0; i < 10; i++) {
+ final int length = RANDOM.nextInt(2 * MAPPED_BUFFER_THRESHOLD) + 1;
+ runTestReadFile(length, dir, length > MAPPED_BUFFER_THRESHOLD);
+ }
+ } finally {
+ FileUtils.deleteDirectory(dir);
+ }
+ }
+
+ void runTestReadFile(int length, File dir, boolean isMapped)
+ throws Exception {
+ final File file;
+ for (int i = length; ; i++) {
+ final File f = new File(dir, "file_" + i);
+ if (!f.exists()) {
+ file = f;
+ break;
+ }
+ }
+ LOG.info("file: {}", file);
+
+ // write a file
+ final byte[] array = new byte[BUFFER_CAPACITY];
+ final long seed = System.nanoTime();
+ LOG.info("seed: {}", seed);
+ RANDOM.setSeed(seed);
+ try (OutputStream out = new BufferedOutputStream(Files.newOutputStream(
+ file.toPath(), StandardOpenOption.CREATE_NEW))) {
+ for (int written = 0; written < length;) {
+ RANDOM.nextBytes(array);
+ final int remaining = length - written;
+ final int toWrite = Math.min(remaining, array.length);
+ out.write(array, 0, toWrite);
+ written += toWrite;
+ }
+ }
+ Assertions.assertEquals(length, file.length());
+
+ // read the file back
+ final ChunkBuffer chunk = readData(file, 0, length);
+ Assertions.assertEquals(length, chunk.remaining());
+
+ final List<ByteBuffer> buffers = chunk.asByteBufferList();
+ LOG.info("buffers.size(): {}", buffers.size());
+ Assertions.assertEquals((length - 1) / BUFFER_CAPACITY + 1,
buffers.size());
+ LOG.info("buffer class: {}", buffers.get(0).getClass());
+
+ RANDOM.setSeed(seed);
+ for (ByteBuffer b : buffers) {
+ Assertions.assertEquals(isMapped, b instanceof MappedByteBuffer);
+ RANDOM.nextBytes(array);
+ Assertions.assertEquals(ByteBuffer.wrap(array, 0, b.remaining()), b);
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]