mehakmeet commented on code in PR #4766:
URL: https://github.com/apache/hadoop/pull/4766#discussion_r955790641


##########
hadoop-tools/hadoop-aws/src/test/resources/core-site.xml:
##########
@@ -184,12 +184,17 @@
     <value>true</value>
   </property>
 
+

Review Comment:
   nit: extra blank line



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestSDKStreamDrainer.java:
##########
@@ -0,0 +1,342 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.IOException;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.assertj.core.api.Assertions;
+import org.junit.Test;
+
+import org.apache.hadoop.test.HadoopTestBase;
+
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS;
+import static org.apache.hadoop.test.LambdaTestUtils.intercept;
+
+/**
+ * Unit tests for stream draining.
+ */
+public class TestSDKStreamDrainer extends HadoopTestBase {
+
+  public static final int BYTES = 100;
+
+  /**
+   * Aborting does as asked.
+   */
+  @Test
+  public void testDrainerAborted() throws Throwable {
+    assertAborted(drainer(BYTES, true, stream()));
+  }
+
+  /**
+   * Create a stream of the default length.
+   * @return a stream.
+   */
+  private static FakeSDKInputStream stream() {
+    return new FakeSDKInputStream(BYTES);
+  }
+
+  /**
+   * a normal drain; all bytes are read. No abort.
+   */
+  @Test
+  public void testDrainerDrained() throws Throwable {
+    assertBytesReadNotAborted(
+        drainer(BYTES, false, stream()),
+        BYTES);
+  }
+
+  /**
+   * Empty streams are fine.
+   */
+  @Test
+  public void testEmptyStream() throws Throwable {
+    int size = 0;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Single char read; just a safety check on the test stream more than
+   * the production code.
+   */
+  @Test
+  public void testSingleChar() throws Throwable {
+    int size = 1;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * a read spanning multiple buffers.
+   */
+  @Test
+  public void testMultipleBuffers() throws Throwable {
+    int size = DRAIN_BUFFER_SIZE + 1;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Read of exactly one buffer.
+   */
+  @Test
+  public void testExactlyOneBuffer() throws Throwable {
+    int size = DRAIN_BUFFER_SIZE;
+    assertBytesReadNotAborted(
+        drainer(size, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Less data than expected came back. not escalated.
+   */
+  @Test
+  public void testStreamUnderflow() throws Throwable {
+    int size = 50;
+    assertBytesReadNotAborted(
+        drainer(BYTES, false, new FakeSDKInputStream(size)),
+        size);
+  }
+
+  /**
+   * Test a drain where a read triggers an IOE; this must escalate
+   * to an abort.
+   */
+  @Test
+  public void testReadFailure() throws Throwable {
+    int threshold = 50;
+    SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+        null,
+        new FakeSDKInputStream(BYTES, threshold),
+        false,
+        BYTES,
+        EMPTY_INPUT_STREAM_STATISTICS, "test");
+    intercept(IOException.class, "", () ->
+        drainer.applyRaisingException());
+
+    assertAborted(drainer);
+  }
+
+  /**
+   * abort does not read(), so the exception will not surface.
+   */
+  @Test
+  public void testReadFailureDoesNotSurfaceInAbort() throws Throwable {
+    int threshold = 50;
+    SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+        null,
+        new FakeSDKInputStream(BYTES, threshold),
+        true,
+        BYTES,
+        EMPTY_INPUT_STREAM_STATISTICS, "test");
+    drainer.applyRaisingException();
+
+    assertAborted(drainer);
+  }
+
+  /**
+   * make sure the underlying stream read code works.
+   */
+  @Test
+  public void testFakeStreamRead() throws Throwable {
+    FakeSDKInputStream stream = stream();
+    int count = 0;
+    while (stream.read() > 0) {
+      count++;
+    }
+    Assertions.assertThat(count)
+        .describedAs("bytes read from %s", stream)
+        .isEqualTo(BYTES);
+  }
+
+  /**
+   * Create a drainer and invoke it, rethrowing any exception
+   * which occurred during the draining.
+   * @param remaining bytes remaining in the stream
+   * @param shouldAbort should we abort?
+   * @param in input stream.
+   * @return the drainer
+   * @throws Throwable something went wrong
+   */
+  private SDKStreamDrainer drainer(int remaining,
+      boolean shouldAbort,
+      FakeSDKInputStream in) throws Throwable {
+    SDKStreamDrainer drainer = new SDKStreamDrainer("s3://example/",
+        null,
+        in,
+        shouldAbort,
+        remaining,
+        EMPTY_INPUT_STREAM_STATISTICS, "test");
+    drainer.applyRaisingException();
+    return drainer;
+  }
+
+
+  /**
+   * The draining aborted.
+   * @param drainer drainer to assert on.
+   * @return the drainer.
+   */
+  private SDKStreamDrainer assertAborted(SDKStreamDrainer drainer) {
+    Assertions.assertThat(drainer)
+        .matches(SDKStreamDrainer::isAborted, "isAborted");
+    return drainer;
+  }
+
+  /**
+   * The draining was not aborted.
+   * @param drainer drainer to assert on.
+   * @return the drainer.
+   */
+  private SDKStreamDrainer assertNotAborted(SDKStreamDrainer drainer) {
+    Assertions.assertThat(drainer)
+        .matches(d -> !d.isAborted(), "is not aborted");
+    return drainer;
+  }
+
+  /**
+   * The draining was not aborted and {@code bytes} were read.
+   * @param drainer drainer to assert on.
+   * @param bytes expected byte count
+   * @return the drainer.
+   */
+  private SDKStreamDrainer assertBytesReadNotAborted(SDKStreamDrainer drainer,
+      int bytes) {
+    return assertBytesRead(assertNotAborted(drainer), bytes);
+  }
+
+  /**
+   * Assert {@code bytes} were read.
+   * @param drainer drainer to assert on.
+   * @param bytes expected byte count
+   * @return the drainer.
+   */
+  private static SDKStreamDrainer assertBytesRead(final SDKStreamDrainer 
drainer,
+      final int bytes) {
+    Assertions.assertThat(drainer)

Review Comment:
   Add an error message to this assertion in case of failure. "Bytes read are 
not as expected" maybe.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Test stream unbuffer performance/behavior with stream draining
+ * and aborting.
+ */
+public class ITestUnbufferDraining extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestUnbufferDraining.class);
+
+  public static final int READAHEAD = 1000;
+
+  public static final int FILE_SIZE = 50_000;
+
+  public static final int ATTEMPTS = 10;
+
+  private FileSystem brittleFS;
+
+  /**
+   * Create with markers kept, always.
+   */
+  public ITestUnbufferDraining() {
+    super(false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        ASYNC_DRAIN_THRESHOLD,
+        ESTABLISH_TIMEOUT,
+        INPUT_FADVISE,
+        MAX_ERROR_RETRIES,
+        MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
+        READAHEAD_RANGE,
+        REQUEST_TIMEOUT,
+        RETRY_LIMIT,
+        SOCKET_TIMEOUT);
+
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // now create a new FS with minimal http capacity and recovery
+    // a separate one is used to avoid test teardown suffering
+    // from the lack of http connections and short timeouts.
+    Configuration conf = getConfiguration();
+    // kick off async drain for any data
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
+    conf.setInt(MAXIMUM_CONNECTIONS, 2);
+    conf.setInt(MAX_ERROR_RETRIES, 1);
+    conf.setInt(ESTABLISH_TIMEOUT, 1000);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.setInt(RETRY_LIMIT, 1);
+
+    brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, brittleFS);
+  }
+
+  public FileSystem getBrittleFS() {
+    return brittleFS;
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferDraining() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+    int offset = FILE_SIZE - READAHEAD + 1;
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .build().get()) {
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.seek(offset);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferAborting() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+
+    // open the file at the beginning with a whole file read policy,
+    // so even with s3a switching to random on unbuffer,
+    // this always does a full GET
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .must(FS_OPTION_OPENFILE_READ_POLICY,
+            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+        .build().get()) {
+
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  private FileStatus createTestFile() throws IOException {
+    byte[] data = dataset(FILE_SIZE, '0', 10);
+    S3AFileSystem fs = getFileSystem();
+
+    Path path = methodPath();
+    ContractTestUtils.createFile(fs, path, true, data);
+    FileStatus st = fs.getFileStatus(path);
+    return st;

Review Comment:
   nit: Just return `fs.getFileStatus(path);`, variable seems redundant?



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Test stream unbuffer performance/behavior with stream draining
+ * and aborting.
+ */
+public class ITestUnbufferDraining extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestUnbufferDraining.class);
+
+  public static final int READAHEAD = 1000;
+
+  public static final int FILE_SIZE = 50_000;
+
+  public static final int ATTEMPTS = 10;
+
+  private FileSystem brittleFS;
+
+  /**
+   * Create with markers kept, always.
+   */
+  public ITestUnbufferDraining() {
+    super(false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        ASYNC_DRAIN_THRESHOLD,
+        ESTABLISH_TIMEOUT,
+        INPUT_FADVISE,
+        MAX_ERROR_RETRIES,
+        MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
+        READAHEAD_RANGE,
+        REQUEST_TIMEOUT,
+        RETRY_LIMIT,
+        SOCKET_TIMEOUT);
+
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // now create a new FS with minimal http capacity and recovery
+    // a separate one is used to avoid test teardown suffering
+    // from the lack of http connections and short timeouts.
+    Configuration conf = getConfiguration();
+    // kick off async drain for any data
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
+    conf.setInt(MAXIMUM_CONNECTIONS, 2);
+    conf.setInt(MAX_ERROR_RETRIES, 1);
+    conf.setInt(ESTABLISH_TIMEOUT, 1000);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.setInt(RETRY_LIMIT, 1);
+
+    brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, brittleFS);
+  }
+
+  public FileSystem getBrittleFS() {
+    return brittleFS;
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferDraining() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+    int offset = FILE_SIZE - READAHEAD + 1;
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .build().get()) {
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.seek(offset);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferAborting() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+
+    // open the file at the beginning with a whole file read policy,
+    // so even with s3a switching to random on unbuffer,
+    // this always does a full GET
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .must(FS_OPTION_OPENFILE_READ_POLICY,
+            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE)
+        .build().get()) {
+
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.read();
+        in.unbuffer();
+      }

Review Comment:
   We can assert the number of aborts collected in IOStats after the for loop 
`StreamStatisticNames.STREAM_READ_ABORTED` to be 10 in this test and 0 in the 
above test.



##########
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestUnbufferDraining.java:
##########
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.performance;
+
+import java.io.IOException;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.contract.ContractTestUtils;
+import org.apache.hadoop.fs.s3a.S3AFileSystem;
+import org.apache.hadoop.io.IOUtils;
+
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
+import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
+import static org.apache.hadoop.fs.s3a.Constants.ASYNC_DRAIN_THRESHOLD;
+import static org.apache.hadoop.fs.s3a.Constants.ESTABLISH_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.INPUT_FADVISE;
+import static org.apache.hadoop.fs.s3a.Constants.MAXIMUM_CONNECTIONS;
+import static org.apache.hadoop.fs.s3a.Constants.MAX_ERROR_RETRIES;
+import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_ENABLED_KEY;
+import static org.apache.hadoop.fs.s3a.Constants.READAHEAD_RANGE;
+import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
+import static org.apache.hadoop.fs.s3a.Constants.RETRY_LIMIT;
+import static org.apache.hadoop.fs.s3a.Constants.SOCKET_TIMEOUT;
+import static 
org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
+
+/**
+ * Test stream unbuffer performance/behavior with stream draining
+ * and aborting.
+ */
+public class ITestUnbufferDraining extends AbstractS3ACostTest {
+
+  private static final Logger LOG =
+      LoggerFactory.getLogger(ITestUnbufferDraining.class);
+
+  public static final int READAHEAD = 1000;
+
+  public static final int FILE_SIZE = 50_000;
+
+  public static final int ATTEMPTS = 10;
+
+  private FileSystem brittleFS;
+
+  /**
+   * Create with markers kept, always.
+   */
+  public ITestUnbufferDraining() {
+    super(false);
+  }
+
+  @Override
+  public Configuration createConfiguration() {
+    Configuration conf = super.createConfiguration();
+    removeBaseAndBucketOverrides(conf,
+        ASYNC_DRAIN_THRESHOLD,
+        ESTABLISH_TIMEOUT,
+        INPUT_FADVISE,
+        MAX_ERROR_RETRIES,
+        MAXIMUM_CONNECTIONS,
+        PREFETCH_ENABLED_KEY,
+        READAHEAD_RANGE,
+        REQUEST_TIMEOUT,
+        RETRY_LIMIT,
+        SOCKET_TIMEOUT);
+
+    return conf;
+  }
+
+  @Override
+  public void setup() throws Exception {
+    super.setup();
+
+    // now create a new FS with minimal http capacity and recovery
+    // a separate one is used to avoid test teardown suffering
+    // from the lack of http connections and short timeouts.
+    Configuration conf = getConfiguration();
+    // kick off async drain for any data
+    conf.setInt(ASYNC_DRAIN_THRESHOLD, 1);
+    conf.setInt(MAXIMUM_CONNECTIONS, 2);
+    conf.setInt(MAX_ERROR_RETRIES, 1);
+    conf.setInt(ESTABLISH_TIMEOUT, 1000);
+    conf.setInt(READAHEAD_RANGE, READAHEAD);
+    conf.setInt(RETRY_LIMIT, 1);
+
+    brittleFS = FileSystem.newInstance(getFileSystem().getUri(), conf);
+  }
+
+  @Override
+  public void teardown() throws Exception {
+    super.teardown();
+    IOUtils.cleanupWithLogger(LOG, brittleFS);
+  }
+
+  public FileSystem getBrittleFS() {
+    return brittleFS;
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining
+   * and unbuffer.
+   */
+  @Test
+  public void testUnbufferDraining() throws Throwable {
+
+    describe("unbuffer draining");
+    FileStatus st = createTestFile();
+
+    int offset = FILE_SIZE - READAHEAD + 1;
+    try (FSDataInputStream in = getBrittleFS().openFile(st.getPath())
+        .withFileStatus(st)
+        .must(ASYNC_DRAIN_THRESHOLD, 1)
+        .build().get()) {
+      describe("Initiating unbuffer with async drain\n");
+      for (int i = 0; i < ATTEMPTS; i++) {
+        describe("Starting read/unbuffer #%d", i);
+        in.seek(offset);
+        in.read();
+        in.unbuffer();
+      }
+    }
+  }
+
+  /**
+   * Test stream close performance/behavior with stream draining

Review Comment:
   Same Javadocs as the above test, is that intentional? Maybe we should add 
aborting for this test?



##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/SDKStreamDrainer.java:
##########
@@ -0,0 +1,325 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.Closeable;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import javax.annotation.Nullable;
+
+import com.amazonaws.internal.SdkFilterInputStream;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
+import org.apache.hadoop.util.functional.CallableRaisingIOE;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.DRAIN_BUFFER_SIZE;
+import static 
org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+
+/**
+ * Drains/aborts s3 or other AWS SDK streams.
+ * It is callable so can be passed directly to a submitter
+ * for async invocation.
+ * A request object may be passed in; it will be implicitly
+ * cached until this object is GCd.
+ * This is because in some versions of the AWS SDK, the S3Object
+ * has a finalize() method which releases the http connection,
+ * even when the stream is still open.
+ * See HADOOP-17338 for details.
+ */
+public class SDKStreamDrainer implements CallableRaisingIOE<Boolean> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(
+      SDKStreamDrainer.class);
+
+  /**
+   * URI for log messages.
+   */
+  private final String uri;
+
+  /**
+   * Request object; usually S3Object
+   * Never used, but needed to keep the http connection
+   * open long enough for draining to take place.
+   */
+  @Nullable
+  private final Closeable requestObject;
+
+  /**
+   * Stream from the {@link #requestObject} for draining and closing.
+   */
+  private final SdkFilterInputStream inner;
+
+  /**
+   * Should the request be aborted?
+   */
+  private final boolean shouldAbort;
+
+  /**
+   * How many bytes remaining?
+   * This is decremented as the stream is
+   * drained;
+   * If the stream finished before the expected
+   * remaining value was read, this will show how many
+   * bytes were still expected.
+   */
+  private int remaining;
+
+  /**
+   * Statistics to update with the duration.
+   */
+  private final S3AInputStreamStatistics streamStatistics;
+
+  /**
+   * Reason? for log messages.
+   */
+  private final String reason;
+
+  /**
+   * Has the operation executed yet?
+   */
+  private final AtomicBoolean executed = new AtomicBoolean(false);
+
+  /**
+   * Any exception caught during execution.
+   */
+  private Exception thrown;
+
+  /**
+   * Was the stream aborted?
+   */
+  private boolean aborted;
+
+  /**
+   * how many bytes were drained?
+   */
+  private int drained = 0;
+
+  /**
+   * Prepare to drain the stream.
+   * @param uri URI for messages
+   * @param requestObject http request object; needed to avoid GC issues.
+   * @param inner stream to close.
+   * @param shouldAbort force an abort; used if explicitly requested.
+   * @param streamStatistics stats to update
+   * @param reason reason for stream being closed; used in messages
+   * @param remaining remaining bytes
+   */
+  public SDKStreamDrainer(final String uri,
+      @Nullable final Closeable requestObject,
+      final SdkFilterInputStream inner,
+      final boolean shouldAbort,
+      final int remaining,
+      final S3AInputStreamStatistics streamStatistics,
+      final String reason) {
+    this.uri = uri;
+    this.requestObject = requestObject;
+    this.inner = requireNonNull(inner);
+    this.shouldAbort = shouldAbort;
+    this.remaining = remaining;
+    this.streamStatistics = requireNonNull(streamStatistics);
+    this.reason = reason;
+  }
+
+  /**
+   * drain the stream. This method is intended to be
+   * used directly or asynchronously, and measures the
+   * duration of the operation in the stream statistics.
+   * @return was the stream aborted?
+   */
+  @Override
+  public Boolean apply() {
+    try {
+      Boolean outcome = invokeTrackingDuration(
+          streamStatistics.initiateInnerStreamClose(shouldAbort),
+          this::drainOrAbortHttpStream);
+      aborted = outcome;
+      return outcome;
+    } catch (Exception e) {
+      thrown = e;
+      return aborted;
+    }
+  }
+
+  /**
+   * Apply, raising any exception.
+   * For testing.
+   * @return the outcome.
+   * @throws Exception anything raised.
+   */
+  @VisibleForTesting
+  boolean applyRaisingException() throws Exception {
+    Boolean outcome = apply();
+    if (thrown != null) {
+      throw thrown;
+    }
+    return outcome;
+  }
+
+  /**
+   * Drain or abort the inner stream.
+   * Exceptions are saved then swallowed.
+   * If a close() is attempted and fails, the operation escalates to
+   * an abort.
+   */

Review Comment:
   Add a `@return` comment in the Javadocs to describe what is being returned.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to