This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch feature-HADOOP-18028-s3a-prefetch-branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git

commit caf58937a0a7691c24199ef81be23d48e158a320
Author: Steve Loughran <ste...@cloudera.com>
AuthorDate: Wed Aug 31 11:16:52 2022 +0100

    HADOOP-18410. S3AInputStream.unbuffer() does not release http connections 
-prefetch changes(#4766)
    
    Changes in HADOOP-18410 which are needed for the S3A prefetching stream; 
needed
    as part of the HADOOP-18703 backport
    
    Change-Id: Ib403ca793e29a4416e5d892f9081de5832da3b68
---
 .../hadoop/fs/s3a/prefetch/S3ARemoteObject.java    | 108 +++------------------
 .../fs/s3a/performance/ITestUnbufferDraining.java  |   2 +
 2 files changed, 17 insertions(+), 93 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java
index d749e9df02f..3ab0022bb08 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ARemoteObject.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import com.amazonaws.services.s3.model.GetObjectRequest;
 import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,12 +37,10 @@ import org.apache.hadoop.fs.s3a.S3AInputStream;
 import org.apache.hadoop.fs.s3a.S3AReadOpContext;
 import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
 import org.apache.hadoop.fs.s3a.impl.ChangeTracker;
+import org.apache.hadoop.fs.s3a.impl.SDKStreamDrainer;
 import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
 import org.apache.hadoop.fs.statistics.DurationTracker;
 
-import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
-import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
-
 /**
  * Encapsulates low level interactions with S3 object on AWS.
  */
@@ -79,7 +78,7 @@ public class S3ARemoteObject {
    * Maps a stream returned by openForRead() to the associated S3 object.
    * That allows us to close the object when closing the stream.
    */
-  private Map<InputStream, S3Object> s3Objects;
+  private final Map<InputStream, S3Object> s3Objects;
 
   /**
    * uri of the object being read.
@@ -225,104 +224,27 @@ public class S3ARemoteObject {
   void close(InputStream inputStream, int numRemainingBytes) {
     S3Object obj;
     synchronized (s3Objects) {
-      obj = s3Objects.get(inputStream);
+      obj = s3Objects.remove(inputStream);
       if (obj == null) {
         throw new IllegalArgumentException("inputStream not found");
       }
-      s3Objects.remove(inputStream);
     }
-
+    SDKStreamDrainer drainer = new SDKStreamDrainer(
+        uri,
+        obj,
+        (S3ObjectInputStream)inputStream,
+        false,
+        numRemainingBytes,
+        streamStatistics,
+        "close() operation");
     if (numRemainingBytes <= context.getAsyncDrainThreshold()) {
       // don't bother with async io.
-      drain(false, "close() operation", numRemainingBytes, obj, inputStream);
+      drainer.apply();
     } else {
       LOG.debug("initiating asynchronous drain of {} bytes", 
numRemainingBytes);
-      // schedule an async drain/abort with references to the fields so they
-      // can be reused
-      client.submit(
-          () -> drain(false, "close() operation", numRemainingBytes, obj,
-              inputStream));
+      // schedule an async drain/abort
+      client.submit(drainer);
     }
   }
 
-  /**
-   * drain the stream. This method is intended to be
-   * used directly or asynchronously, and measures the
-   * duration of the operation in the stream statistics.
-   *
-   * @param shouldAbort   force an abort; used if explicitly requested.
-   * @param reason        reason for stream being closed; used in messages
-   * @param remaining     remaining bytes
-   * @param requestObject http request object;
-   * @param inputStream   stream to close.
-   * @return was the stream aborted?
-   */
-  private boolean drain(
-      final boolean shouldAbort,
-      final String reason,
-      final long remaining,
-      final S3Object requestObject,
-      final InputStream inputStream) {
-
-    try {
-      return invokeTrackingDuration(
-          streamStatistics.initiateInnerStreamClose(shouldAbort),
-          () -> drainOrAbortHttpStream(shouldAbort, reason, remaining,
-              requestObject, inputStream));
-    } catch (IOException e) {
-      // this is only here because invokeTrackingDuration() has it in its
-      // signature
-      return shouldAbort;
-    }
-  }
-
-  /**
-   * Drain or abort the inner stream.
-   * Exceptions are swallowed.
-   * If a close() is attempted and fails, the operation escalates to
-   * an abort.
-   *
-   * @param shouldAbort   force an abort; used if explicitly requested.
-   * @param reason        reason for stream being closed; used in messages
-   * @param remaining     remaining bytes
-   * @param requestObject http request object
-   * @param inputStream   stream to close.
-   * @return was the stream aborted?
-   */
-  private boolean drainOrAbortHttpStream(
-      boolean shouldAbort,
-      final String reason,
-      final long remaining,
-      final S3Object requestObject,
-      final InputStream inputStream) {
-
-    if (!shouldAbort && remaining > 0) {
-      try {
-        long drained = 0;
-        byte[] buffer = new byte[DRAIN_BUFFER_SIZE];
-        while (true) {
-          final int count = inputStream.read(buffer);
-          if (count < 0) {
-            // no more data is left
-            break;
-          }
-          drained += count;
-        }
-        LOG.debug("Drained stream of {} bytes", drained);
-      } catch (Exception e) {
-        // exception escalates to an abort
-        LOG.debug("When closing {} stream for {}, will abort the stream", uri,
-            reason, e);
-        shouldAbort = true;
-      }
-    }
-    cleanupWithLogger(LOG, inputStream);
-    cleanupWithLogger(LOG, requestObject);
-    streamStatistics.streamClose(shouldAbort, remaining);
-
-    LOG.debug("Stream {} {}: {}; remaining={}", uri,
-        (shouldAbort ? "aborted" : "closed"), reason,
-        remaining);
-    return shouldAbort;
-  }
 }
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
index 0295c07e567..a03f181cb38 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java
@@ -45,6 +45,7 @@ import static 
org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
 import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
 import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
 import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
 import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
 import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
@@ -102,6 +103,7 @@ public class ITestUnbufferDraining extends 
AbstractS3ACostTest {
         INPUT_FADVISE,
         MAX_ERROR_RETRIES,
         MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
         READAHEAD_RANGE,
         REQUEST_TIMEOUT,
         RETRY_LIMIT,


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to