This is an automated email from the ASF dual-hosted git repository.

elek pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hadoop-ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 9a702e5  HDDS-3979. Make bufferSize configurable for stream copy 
(#1212)
9a702e5 is described below

commit 9a702e5c35cfdffe7f0f1a4e2cc7ec97ec8e6d4e
Author: maobaolong <307499...@qq.com>
AuthorDate: Tue Aug 11 20:20:17 2020 +0800

    HDDS-3979. Make bufferSize configurable for stream copy (#1212)
---
 .../org/apache/hadoop/ozone/OzoneConfigKeys.java   |   1 +
 .../common/src/main/resources/ozone-default.xml    |   8 ++
 .../hadoop/ozone/client/io/KeyInputStream.java     |  62 ++---------
 .../ozone/client/rpc/TestKeyInputStream.java       | 119 ++++++++++-----------
 .../hadoop/ozone/s3/S3GatewayConfigKeys.java       |   6 ++
 .../hadoop/ozone/s3/endpoint/ObjectEndpoint.java   |  36 ++++---
 .../hadoop/ozone/s3/io/S3WrapperInputStream.java   |  36 +------
 7 files changed, 107 insertions(+), 161 deletions(-)

diff --git 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
index d89fef9..482ac88 100644
--- 
a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
+++ 
b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java
@@ -468,6 +468,7 @@ public final class OzoneConfigKeys {
   public static final String  OZONE_CLIENT_HTTPS_NEED_AUTH_KEY =
       "ozone.https.client.need-auth";
   public static final boolean OZONE_CLIENT_HTTPS_NEED_AUTH_DEFAULT = false;
+
   /**
    * There is no need to instantiate this class.
    */
diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml 
b/hadoop-hdds/common/src/main/resources/ozone-default.xml
index b9774aa..5770448 100644
--- a/hadoop-hdds/common/src/main/resources/ozone-default.xml
+++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml
@@ -2432,6 +2432,14 @@
     </description>
   </property>
   <property>
+    <name>ozone.s3g.client.buffer.size</name>
+    <tag>OZONE, S3GATEWAY</tag>
+    <value>4KB</value>
+    <description>
+      The size of the buffer which is for read block. (4KB by default).
+    </description>
+  </property>
+  <property>
     <name>ssl.server.keystore.keypassword</name>
     <tag>OZONE, SECURITY, MANAGEMENT</tag>
     <value></value>
diff --git 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
index 4af6838..769035a 100644
--- 
a/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
+++ 
b/hadoop-ozone/client/src/main/java/org/apache/hadoop/ozone/client/io/KeyInputStream.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.ozone.client.io;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.commons.collections.CollectionUtils;
-import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FSExceptionMessages;
 import org.apache.hadoop.fs.Seekable;
 import org.apache.hadoop.hdds.client.BlockID;
@@ -35,7 +34,6 @@ import org.slf4j.LoggerFactory;
 import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -325,62 +323,14 @@ public class KeyInputStream extends InputStream 
implements Seekable {
     return blockStreams.get(index).getRemaining();
   }
 
-  /**
-   * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
-   * to an <code>OutputStream</code>, optionally skipping input bytes.
-   * <p>
-   * Copy the method from IOUtils of commons-io to reimplement skip by seek
-   * rather than read. The reason why IOUtils of commons-io implement skip
-   * by read can be found at
-   * <a href="https://issues.apache.org/jira/browse/IO-203";>IO-203</a>.
-   * </p>
-   * <p>
-   * This method uses the provided buffer, so there is no need to use a
-   * <code>BufferedInputStream</code>.
-   * </p>
-   *
-   * @param output the <code>OutputStream</code> to write to
-   * @param inputOffset : number of bytes to skip from input before copying
-   * -ve values are ignored
-   * @param length : number of bytes to copy. -ve means all
-   * @param buffer the buffer to use for the copy
-   * @return the number of bytes copied
-   * @throws NullPointerException if the input or output is null
-   * @throws IOException          if an I/O error occurs
-   */
-  public long copyLarge(final OutputStream output,
-      final long inputOffset, final long len, final byte[] buffer)
-      throws IOException {
-    if (inputOffset > 0) {
-      seek(inputOffset);
-    }
-
-    if (len == 0) {
+  @Override
+  public long skip(long n) throws IOException {
+    if (n <= 0) {
       return 0;
     }
 
-    final int bufferLength = buffer.length;
-    int bytesToRead = bufferLength;
-    if (len > 0 && len < bufferLength) {
-      bytesToRead = (int) len;
-    }
-
-    int read;
-    long totalRead = 0;
-    while (bytesToRead > 0) {
-      read = read(buffer, 0, bytesToRead);
-      if (read == IOUtils.EOF) {
-        break;
-      }
-
-      output.write(buffer, 0, read);
-      totalRead += read;
-      if (len > 0) { // only adjust len if not reading to the end
-        // Note the cast must work because buffer.length is an integer
-        bytesToRead = (int) Math.min(len - totalRead, bufferLength);
-      }
-    }
-
-    return totalRead;
+    long toSkip = Math.min(n, length - getPos());
+    seek(getPos() + toSkip);
+    return toSkip;
   }
 }
diff --git 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
index 8ab176d..7775bb7 100644
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/client/rpc/TestKeyInputStream.java
@@ -43,7 +43,6 @@ import org.junit.rules.Timeout;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
-import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.Collection;
@@ -295,66 +294,6 @@ public class TestKeyInputStream {
   }
 
   @Test
-  public void testCopyLarge() throws Exception {
-    String keyName = getKeyName();
-    OzoneOutputStream key = TestHelper.createKey(keyName,
-        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
-
-    // write data spanning 3 blocks
-    int dataLength = (2 * blockSize) + (blockSize / 2);
-
-    byte[] inputData = new byte[dataLength];
-    Random rand = new Random();
-    for (int i = 0; i < dataLength; i++) {
-      inputData[i] = (byte) rand.nextInt(127);
-    }
-    key.write(inputData);
-    key.close();
-
-    // test with random start and random length
-    for (int i = 0; i < 100; i++) {
-      int inputOffset = rand.nextInt(dataLength - 1);
-      int length = rand.nextInt(dataLength - inputOffset);
-
-      KeyInputStream keyInputStream = (KeyInputStream) objectStore
-          .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
-          .getInputStream();
-      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
-      keyInputStream.copyLarge(outputStream, inputOffset, length,
-          new byte[4096]);
-      byte[] readData = outputStream.toByteArray();
-      keyInputStream.close();
-      outputStream.close();
-
-      for (int j = inputOffset; j < inputOffset + length; j++) {
-        Assert.assertEquals(readData[j - inputOffset], inputData[j]);
-      }
-    }
-
-    // test with random start and -ve length
-    for (int i = 0; i < 10; i++) {
-      int inputOffset = rand.nextInt(dataLength - 1);
-      int length = -1;
-
-      KeyInputStream keyInputStream = (KeyInputStream) objectStore
-          .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
-          .getInputStream();
-      ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
-
-      keyInputStream.copyLarge(outputStream, inputOffset, length,
-          new byte[4096]);
-      byte[] readData = outputStream.toByteArray();
-      keyInputStream.close();
-      outputStream.close();
-
-      for (int j = inputOffset; j < dataLength; j++) {
-        Assert.assertEquals(readData[j - inputOffset], inputData[j]);
-      }
-    }
-  }
-
-  @Test
   public void testReadChunk() throws Exception {
     String keyName = getKeyName();
     OzoneOutputStream key = TestHelper.createKey(keyName,
@@ -395,4 +334,62 @@ public class TestKeyInputStream {
     }
     keyInputStream.close();
   }
+
+  @Test
+  public void testSkip() throws Exception {
+    XceiverClientManager.resetXceiverClientMetrics();
+    XceiverClientMetrics metrics = XceiverClientManager
+        .getXceiverClientMetrics();
+    long writeChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.WriteChunk);
+    long readChunkCount = metrics.getContainerOpCountMetrics(
+        ContainerProtos.Type.ReadChunk);
+
+    String keyName = getKeyName();
+    OzoneOutputStream key = TestHelper.createKey(keyName,
+        ReplicationType.RATIS, 0, objectStore, volumeName, bucketName);
+
+    // write data spanning 3 chunks
+    int dataLength = (2 * chunkSize) + (chunkSize / 2);
+    byte[] inputData = ContainerTestHelper.getFixedLengthString(
+        keyString, dataLength).getBytes(UTF_8);
+    key.write(inputData);
+    key.close();
+
+    Assert.assertEquals(writeChunkCount + 3,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.WriteChunk));
+
+    KeyInputStream keyInputStream = (KeyInputStream) objectStore
+        .getVolume(volumeName).getBucket(bucketName).readKey(keyName)
+        .getInputStream();
+
+    // skip 150
+    keyInputStream.skip(70);
+    Assert.assertEquals(70, keyInputStream.getPos());
+    keyInputStream.skip(0);
+    Assert.assertEquals(70, keyInputStream.getPos());
+    keyInputStream.skip(80);
+
+    Assert.assertEquals(150, keyInputStream.getPos());
+
+    // Skip operation should not result in any readChunk operation.
+    Assert.assertEquals(readChunkCount, metrics
+        .getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+    byte[] readData = new byte[chunkSize];
+    keyInputStream.read(readData, 0, chunkSize);
+
+    // Since we reading data from index 150 to 250 and the chunk boundary is
+    // 100 bytes, we need to read 2 chunks.
+    Assert.assertEquals(readChunkCount + 2,
+        metrics.getContainerOpCountMetrics(ContainerProtos.Type.ReadChunk));
+
+    keyInputStream.close();
+
+    // Verify that the data read matches with the input data at corresponding
+    // indices.
+    for (int i = 0; i < chunkSize; i++) {
+      Assert.assertEquals(inputData[chunkSize + 50 + i], readData[i]);
+    }
+  }
 }
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
index fae1c82..5acf368 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/S3GatewayConfigKeys.java
@@ -52,6 +52,12 @@ public final class S3GatewayConfigKeys {
       OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.keytab";
   public static final String OZONE_S3G_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL =
       OZONE_S3G_HTTP_AUTH_CONFIG_PREFIX + "kerberos.principal";
+
+  public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_KEY =
+      "ozone.s3g.client.buffer.size";
+  public static final String OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT =
+      "4KB";
+
   /**
    * Never constructed.
    */
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
index 6f0ea57..5502173 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/endpoint/ObjectEndpoint.java
@@ -17,6 +17,8 @@
  */
 package org.apache.hadoop.ozone.s3.endpoint;
 
+import javax.annotation.PostConstruct;
+import javax.inject.Inject;
 import javax.ws.rs.Consumes;
 import javax.ws.rs.DELETE;
 import javax.ws.rs.DefaultValue;
@@ -49,6 +51,8 @@ import java.util.Map;
 
 import org.apache.hadoop.hdds.client.ReplicationFactor;
 import org.apache.hadoop.hdds.client.ReplicationType;
+import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.conf.StorageUnit;
 import org.apache.hadoop.ozone.client.OzoneBucket;
 import org.apache.hadoop.ozone.client.OzoneKeyDetails;
 import org.apache.hadoop.ozone.client.OzoneMultipartUploadPartListParts;
@@ -77,6 +81,9 @@ import static javax.ws.rs.core.HttpHeaders.LAST_MODIFIED;
 import org.apache.commons.io.IOUtils;
 
 import org.apache.commons.lang3.tuple.Pair;
+
+import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT;
+import static 
org.apache.hadoop.ozone.s3.S3GatewayConfigKeys.OZONE_S3G_CLIENT_BUFFER_SIZE_KEY;
 import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.ENTITY_TOO_SMALL;
 import static 
org.apache.hadoop.ozone.s3.exception.S3ErrorTable.INVALID_REQUEST;
 import static org.apache.hadoop.ozone.s3.exception.S3ErrorTable.NO_SUCH_UPLOAD;
@@ -104,6 +111,7 @@ public class ObjectEndpoint extends EndpointBase {
   private HttpHeaders headers;
 
   private List<String> customizableGetHeaders = new ArrayList<>();
+  private int bufferSize;
 
   public ObjectEndpoint() {
     customizableGetHeaders.add("Content-Type");
@@ -114,6 +122,16 @@ public class ObjectEndpoint extends EndpointBase {
     customizableGetHeaders.add("Content-Encoding");
   }
 
+  @Inject
+  private OzoneConfiguration ozoneConfiguration;
+
+  @PostConstruct
+  public void init() {
+    bufferSize = (int) ozoneConfiguration.getStorageSize(
+        OZONE_S3G_CLIENT_BUFFER_SIZE_KEY,
+        OZONE_S3G_CLIENT_BUFFER_SIZE_DEFAULT, StorageUnit.BYTES);
+  }
+
   /**
    * Rest endpoint to upload object to a bucket.
    * <p>
@@ -259,7 +277,8 @@ public class ObjectEndpoint extends EndpointBase {
           try (S3WrapperInputStream s3WrapperInputStream =
               new S3WrapperInputStream(
                   key.getInputStream())) {
-            s3WrapperInputStream.copyLarge(dest, startOffset, copyLength);
+            IOUtils.copyLarge(s3WrapperInputStream, dest, startOffset,
+                copyLength, new byte[bufferSize]);
           }
         };
         responseBuilder = Response
@@ -400,7 +419,6 @@ public class ObjectEndpoint extends EndpointBase {
     return Response
         .status(Status.NO_CONTENT)
         .build();
-
   }
 
   /**
@@ -539,16 +557,9 @@ public class ObjectEndpoint extends EndpointBase {
             if (range != null) {
               RangeHeader rangeHeader =
                   RangeHeaderParserUtil.parseRangeHeader(range, 0);
-
-              long copyLength = rangeHeader.getEndOffset() -
-                  rangeHeader.getStartOffset();
-
-              try (S3WrapperInputStream s3WrapperInputStream =
-                  new S3WrapperInputStream(
-                  sourceObject.getInputStream())) {
-                s3WrapperInputStream.copyLarge(ozoneOutputStream,
-                    rangeHeader.getStartOffset(), copyLength);
-              }
+              IOUtils.copyLarge(sourceObject, ozoneOutputStream,
+                  rangeHeader.getStartOffset(),
+                  rangeHeader.getEndOffset() - rangeHeader.getStartOffset());
             } else {
               IOUtils.copy(sourceObject, ozoneOutputStream);
             }
@@ -578,7 +589,6 @@ public class ObjectEndpoint extends EndpointBase {
       }
       throw ex;
     }
-
   }
 
   /**
diff --git 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
index edf90ed..d88287c 100644
--- 
a/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
+++ 
b/hadoop-ozone/s3gateway/src/main/java/org/apache/hadoop/ozone/s3/io/S3WrapperInputStream.java
@@ -23,14 +23,12 @@ import org.apache.hadoop.ozone.client.io.KeyInputStream;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
 
 /**
  * S3Wrapper Input Stream which encapsulates KeyInputStream from ozone.
  */
 public class S3WrapperInputStream extends FSInputStream {
   private final KeyInputStream inputStream;
-  private static final int DEFAULT_BUFFER_SIZE = 32 * 1024;
 
   /**
    * Constructs S3WrapperInputStream with KeyInputStream.
@@ -75,36 +73,12 @@ public class S3WrapperInputStream extends FSInputStream {
   }
 
   @Override
-  public boolean seekToNewSource(long targetPos) throws IOException {
-    return false;
+  public long skip(long n) throws IOException {
+    return inputStream.skip(n);
   }
 
-  /**
-   * Copies some or all bytes from a large (over 2GB) <code>InputStream</code>
-   * to an <code>OutputStream</code>, optionally skipping input bytes.
-   * <p>
-   * Copy the method from IOUtils of commons-io to reimplement skip by seek
-   * rather than read. The reason why IOUtils of commons-io implement skip
-   * by read can be found at
-   * <a href="https://issues.apache.org/jira/browse/IO-203";>IO-203</a>.
-   * </p>
-   * <p>
-   * This method buffers the input internally, so there is no need to use a
-   * <code>BufferedInputStream</code>.
-   * </p>
-   * The buffer size is given by {@link #DEFAULT_BUFFER_SIZE}.
-   *
-   * @param output the <code>OutputStream</code> to write to
-   * @param inputOffset : number of bytes to skip from input before copying
-   * -ve values are ignored
-   * @param length : number of bytes to copy. -ve means all
-   * @return the number of bytes copied
-   * @throws NullPointerException if the input or output is null
-   * @throws IOException          if an I/O error occurs
-   */
-  public long copyLarge(final OutputStream output, final long inputOffset,
-      final long length) throws IOException {
-    return inputStream.copyLarge(output, inputOffset, length,
-        new byte[DEFAULT_BUFFER_SIZE]);
+  @Override
+  public boolean seekToNewSource(long targetPos) throws IOException {
+    return false;
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: ozone-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: ozone-commits-h...@hadoop.apache.org

Reply via email to