This is an automated email from the ASF dual-hosted git repository. elek pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git
The following commit(s) were added to refs/heads/master by this push: new 9a702e5 HDDS-3979. Make bufferSize configurable for stream copy (#1212) 9a702e5 is described below commit 9a702e5c35cfdffe7f0f1a4e2cc7ec97ec8e6d4e Author: maobaolong <307499...@qq.com> AuthorDate: Tue Aug 11 20:20:17 2020 +0800 HDDS-3979. Make bufferSize configurable for stream copy (#1212) --- .../org/apache/hadoop/ozone/OzoneConfigKeys.java | 1 + .../common/src/main/resources/ozone-default.xml | 8 ++ .../hadoop/ozone/client/io/KeyInputStream.java | 62 ++--------- .../ozone/client/rpc/TestKeyInputStream.java | 119 ++++++++++----------- .../hadoop/ozone/s3/S3GatewayConfigKeys.java | 6 ++ .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java | 36 ++++--- .../hadoop/ozone/s3/io/S3WrapperInputStream.java | 36 +------ 7 files changed, 107 insertions(+), 161 deletions(-) diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index d89fef9..482ac88 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -468,6 +468,7 @@ public final class OzoneConfigKeys { public static final String OZONE_CLIENT_HTTPS_NEED_AUTH_KEY = "ozone.https.client.need-auth"; public static final boolean OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false; + /** * There is no need to instantiate this class. */ diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index b9774aa..5770448 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -2432,6 +2432,14 @@ </description> </property> <property> + <name>ozone.s3g.client.buffer.size</name> + <tag>OZONE, S3GATEWAY</tag> + <value>4KB</value> + <description> + The size of the buffer which is for read block. (4KB by default). + </description> + </property> + <property> <name>ssl.server.keystore.keypassword</name> <tag>OZONE, SECURITY, MANAGEMENT</tag> <value></value> diff --git a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java index 4af6838..769035a 100644 --- a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java +++ b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java @@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.client.io; import com.google.common.annotations.VisibleForTesting; import org.apache.commons.collections.CollectionUtils; -import org.apache.commons.io.IOUtils; import org.apache.hadoop.fs.FSExceptionMessages; import org.apache.hadoop.fs.Seekable; import org.apache.hadoop.hdds.client.BlockID; @@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory; import java.io.EOFException; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -325,62 +323,14 @@ public class KeyInputStream extends InputStream implements Seekable { return blockStreams.get(index).getRemaining(); } - /** - * Copies some or all bytes from a large (over 2GB) <code>InputStream</code> - * to an <code>OutputStream</code>, optionally skipping input bytes. - * <p> - * Copy the method from IOUtils of commons-io to reimplement skip by seek - * rather than read. The reason why IOUtils of commons-io implement skip - * by read can be found at - * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>. - * </p> - * <p> - * This method uses the provided buffer, so there is no need to use a - * <code>BufferedInputStream</code>. - * </p> - * - * @param output the <code>OutputStream</code> to write to - * @param inputOffset : number of bytes to skip from input before copying - * -ve values are ignored - * @param length : number of bytes to copy. -ve means all - * @param buffer the buffer to use for the copy - * @return the number of bytes copied - * @throws NullPointerException if the input or output is null - * @throws IOException if an I/O error occurs - */ - public long copyLarge(final OutputStream output, - final long inputOffset, final long len, final byte[] buffer) - throws IOException { - if (inputOffset > 0) { - seek(inputOffset); - } - - if (len == 0) { + @Override + public long skip(long n) throws IOException { + if (n <= 0) { return 0; } - final int bufferLength = buffer.length; - int bytesToRead = bufferLength; - if (len > 0 && len < bufferLength) { - bytesToRead = (int) len; - } - - int read; - long totalRead = 0; - while (bytesToRead > 0) { - read = read(buffer, 0, bytesToRead); - if (read == IOUtils.EOF) { - break; - } - - output.write(buffer, 0, read); - totalRead += read; - if (len > 0) { // only adjust len if not reading to the end - // Note the cast must work because buffer.length is an integer - bytesToRead = (int) Math.min(len - totalRead, bufferLength); - } - } - - return totalRead; + long toSkip = Math.min(n, length - getPos()); + seek(getPos() + toSkip); + return toSkip; } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java index 8ab176d..7775bb7 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java @@ -43,7 +43,6 @@ import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Arrays; import java.util.Collection; @@ -295,66 +294,6 @@ public class TestKeyInputStream { } @Test - public void testCopyLarge() throws Exception { - String keyName = getKeyName(); - OzoneOutputStream key = TestHelper.createKey(keyName, - ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); - - // write data spanning 3 blocks - int dataLength = (2 * blockSize) + (blockSize / 2); - - byte[] inputData = new byte[dataLength]; - Random rand = new Random(); - for (int i = 0; i < dataLength; i++) { - inputData[i] = (byte) rand.nextInt(127); - } - key.write(inputData); - key.close(); - - // test with random start and random length - for (int i = 0; i < 100; i++) { - int inputOffset = rand.nextInt(dataLength - 1); - int length = rand.nextInt(dataLength - inputOffset); - - KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName).readKey(keyName) - .getInputStream(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - keyInputStream.copyLarge(outputStream, inputOffset, length, - new byte[4096]); - byte[] readData = outputStream.toByteArray(); - keyInputStream.close(); - outputStream.close(); - - for (int j = inputOffset; j < inputOffset + length; j++) { - Assert.assertEquals(readData[j - inputOffset], inputData[j]); - } - } - - // test with random start and -ve length - for (int i = 0; i < 10; i++) { - int inputOffset = rand.nextInt(dataLength - 1); - int length = -1; - - KeyInputStream keyInputStream = (KeyInputStream) objectStore - .getVolume(volumeName).getBucket(bucketName).readKey(keyName) - .getInputStream(); - ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); - - keyInputStream.copyLarge(outputStream, inputOffset, length, - new byte[4096]); - byte[] readData = outputStream.toByteArray(); - keyInputStream.close(); - outputStream.close(); - - for (int j = inputOffset; j < dataLength; j++) { - Assert.assertEquals(readData[j - inputOffset], inputData[j]); - } - } - } - - @Test public void testReadChunk() throws Exception { String keyName = getKeyName(); OzoneOutputStream key = TestHelper.createKey(keyName, @@ -395,4 +334,62 @@ public class TestKeyInputStream { } keyInputStream.close(); } + + @Test + public void testSkip() throws Exception { + XceiverClientManager.resetXceiverClientMetrics(); + XceiverClientMetrics metrics = XceiverClientManager + .getXceiverClientMetrics(); + long writeChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.WriteChunk); + long readChunkCount = metrics.getContainerOpCountMetrics( + ContainerProtos.Type.ReadChunk); + + String keyName = getKeyName(); + OzoneOutputStream key = TestHelper.createKey(keyName, + ReplicationType.RATIS, 0, objectStore, volumeName, bucketName); + + // write data spanning 3 chunks + int dataLength = (2 * chunkSize) + (chunkSize / 2); + byte[] inputData = ContainerTestHelper.getFixedLengthString( + keyString, dataLength).getBytes(UTF_8); + key.write(inputData); + key.close(); + + Assert.assertEquals(writeChunkCount + 3, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk)); + + KeyInputStream keyInputStream = (KeyInputStream) objectStore + .getVolume(volumeName).getBucket(bucketName).readKey(keyName) + .getInputStream(); + + // skip 150 + keyInputStream.skip(70); + Assert.assertEquals(70, keyInputStream.getPos()); + keyInputStream.skip(0); + Assert.assertEquals(70, keyInputStream.getPos()); + keyInputStream.skip(80); + + Assert.assertEquals(150, keyInputStream.getPos()); + + // Skip operation should not result in any readChunk operation. + Assert.assertEquals(readChunkCount, metrics + .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk)); + + byte[] readData = new byte[chunkSize]; + keyInputStream.read(readData, 0, chunkSize); + + // Since we reading data from index 150 to 250 and the chunk boundary is + // 100 bytes, we need to read 2 chunks. + Assert.assertEquals(readChunkCount + 2, + metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk)); + + keyInputStream.close(); + + // Verify that the data read matches with the input data at corresponding + // indices. + for (int i = 0; i < chunkSize; i++) { + Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]); + } + } } diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java index fae1c82..5acf368 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java @@ -52,6 +52,12 @@ public final class S3GatewayConfigKeys { OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.keytab"; public static final String OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL = OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.principal"; + + public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY = + "ozone.s3g.client.buffer.size"; + public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT = + "4KB"; + /** * Never constructed. */ diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java index 6f0ea57..5502173 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.ozone.s3.endpoint; +import javax.annotation.PostConstruct; +import javax.inject.Inject; import javax.ws.rs.Consumes; import javax.ws.rs.DELETE; import javax.ws.rs.DefaultValue; @@ -49,6 +51,8 @@ import java.util.Map; import org.apache.hadoop.hdds.client.ReplicationFactor; import org.apache.hadoop.hdds.client.ReplicationType; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.conf.StorageUnit; import org.apache.hadoop.ozone.client.OzoneBucket; import org.apache.hadoop.ozone.client.OzoneKeyDetails; import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts; @@ -77,6 +81,9 @@ import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.tuple.Pair; + +import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT; +import static org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST; import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD; @@ -104,6 +111,7 @@ public class ObjectEndpoint extends EndpointBase { private HttpHeaders headers; private List<String> customizableGetHeaders = new ArrayList<>(); + private int bufferSize; public ObjectEndpoint() { customizableGetHeaders.add("Content-Type"); @@ -114,6 +122,16 @@ public class ObjectEndpoint extends EndpointBase { customizableGetHeaders.add("Content-Encoding"); } + @Inject + private OzoneConfiguration ozoneConfiguration; + + @PostConstruct + public void init() { + bufferSize = (int) ozoneConfiguration.getStorageSize( + OZONE_S3G_CLIENT_BUFFER_SIZE_KEY, + OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES); + } + /** * Rest endpoint to upload object to a bucket. * <p> @@ -259,7 +277,8 @@ public class ObjectEndpoint extends EndpointBase { try (S3WrapperInputStream s3WrapperInputStream = new S3WrapperInputStream( key.getInputStream())) { - s3WrapperInputStream.copyLarge(dest, startOffset, copyLength); + IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset, + copyLength, new byte[bufferSize]); } }; responseBuilder = Response @@ -400,7 +419,6 @@ public class ObjectEndpoint extends EndpointBase { return Response .status(Status.NO_CONTENT) .build(); - } /** @@ -539,16 +557,9 @@ public class ObjectEndpoint extends EndpointBase { if (range != null) { RangeHeader rangeHeader = RangeHeaderParserUtil.parseRangeHeader(range, 0); - - long copyLength = rangeHeader.getEndOffset() - - rangeHeader.getStartOffset(); - - try (S3WrapperInputStream s3WrapperInputStream = - new S3WrapperInputStream( - sourceObject.getInputStream())) { - s3WrapperInputStream.copyLarge(ozoneOutputStream, - rangeHeader.getStartOffset(), copyLength); - } + IOUtils.copyLarge(sourceObject, ozoneOutputStream, + rangeHeader.getStartOffset(), + rangeHeader.getEndOffset() - rangeHeader.getStartOffset()); } else { IOUtils.copy(sourceObject, ozoneOutputStream); } @@ -578,7 +589,6 @@ public class ObjectEndpoint extends EndpointBase { } throw ex; } - } /** diff --git a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java index edf90ed..d88287c 100644 --- a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java +++ b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java @@ -23,14 +23,12 @@ import org.apache.hadoop.ozone.client.io.KeyInputStream; import java.io.IOException; import java.io.InputStream; -import java.io.OutputStream; /** * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone. */ public class S3WrapperInputStream extends FSInputStream { private final KeyInputStream inputStream; - private static final int DEFAULT_BUFFER_SIZE = 32 * 1024; /** * Constructs S3WrapperInputStream with KeyInputStream. @@ -75,36 +73,12 @@ public class S3WrapperInputStream extends FSInputStream { } @Override - public boolean seekToNewSource(long targetPos) throws IOException { - return false; + public long skip(long n) throws IOException { + return inputStream.skip(n); } - /** - * Copies some or all bytes from a large (over 2GB) <code>InputStream</code> - * to an <code>OutputStream</code>, optionally skipping input bytes. - * <p> - * Copy the method from IOUtils of commons-io to reimplement skip by seek - * rather than read. The reason why IOUtils of commons-io implement skip - * by read can be found at - * <a href="https://issues.apache.org/jira/browse/IO-203">IO-203</a>. - * </p> - * <p> - * This method buffers the input internally, so there is no need to use a - * <code>BufferedInputStream</code>. - * </p> - * The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}. - * - * @param output the <code>OutputStream</code> to write to - * @param inputOffset : number of bytes to skip from input before copying - * -ve values are ignored - * @param length : number of bytes to copy. -ve means all - * @return the number of bytes copied - * @throws NullPointerException if the input or output is null - * @throws IOException if an I/O error occurs - */ - public long copyLarge(final OutputStream output, final long inputOffset, - final long length) throws IOException { - return inputStream.copyLarge(output, inputOffset, length, - new byte[DEFAULT_BUFFER_SIZE]); + @Override + public boolean seekToNewSource(long targetPos) throws IOException { + return false; } } --------------------------------------------------------------------- To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org