[ 
https://issues.apache.org/jira/browse/HADOOP-17764?focusedWorklogId=611897&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-611897
 ]

ASF GitHub Bot logged work on HADOOP-17764:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 18/Jun/21 20:54
            Start Date: 18/Jun/21 20:54
    Worklog Time Spent: 10m 
      Work Description: bogthe commented on a change in pull request #3109:
URL: https://github.com/apache/hadoop/pull/3109#discussion_r653931489



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
##########
@@ -396,6 +396,41 @@ private void incrementBytesRead(long bytesRead) {
     }
   }
 
+  @FunctionalInterface
+  interface CheckedIntSupplier {
+    int get() throws IOException;
+  }
+
+  /**
+   * Helper function that allows to retry an IntSupplier in case of 
`IOException`.
+   * This function is used by `read()` and `read(buf, off, len)` functions. It 
tries to run
+   * `readFn` and in case of `IOException`:
+   *   1. If it gets an EOFException, return -1
+   *   2. Else, run `onReadFailure` and retry running `readFn`. If it fails 
again,
+   *   we run `onReadFailure` and re-throw the error.
+   * @param readFn the function to read, it must return an integer
+   * @param length length of data being attempted to read
+   * @return -1 if `readFn` throws EOFException, else returns int value from 
the result of `readFn`
+   * @throws IOException if retry of `readFn` also fails with `IOException`
+   */
+  private int retryReadOnce(CheckedIntSupplier readFn, int length) throws 
IOException {
+    try {
+      return readFn.get();
+    } catch (EOFException e) {
+      return -1;
+    } catch (IOException e) {
+      onReadFailure(e, length, e instanceof SocketTimeoutException);

Review comment:
       I see you're calling `onReadFailure` with `length` instead of `1`. Any 
reasoning for this?
   
   That is used to calculate the range for a `GetObjectRequest` when the stream 
is being reopened. If it's intended then I would be curious of the impact it 
has on larger objects, have you done any testing around it?

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -1439,10 +1439,13 @@ public S3Object getObject(GetObjectRequest request) {
    * using FS state as well as the status.
    * @param fileStatus file status.
    * @param seekPolicy input policy for this operation
+   * @param changePolicy change policy for this operation.
    * @param readAheadRange readahead value.
+   * @param auditSpan audit span.
    * @return a context for read and select operations.
    */
-  private S3AReadOpContext createReadContext(
+  @VisibleForTesting
+  protected S3AReadOpContext createReadContext(

Review comment:
       I'm not really convinced that this is needed. Check the main comment for 
details.

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.charset.Charset;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.junit.Test;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+
+import static java.lang.Math.min;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests S3AInputStream retry behavior on read failure.
+ * These tests are for validating expected behavior of retrying the 
S3AInputStream
+ * read() and read(b, off, len), it tests that the read should reopen the 
input stream and retry
+ * the read when IOException is thrown during the read process.
+ */
+public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
+
+  String input = "ab";
+
+  @Test
+  public void testInputStreamReadRetryForException() throws IOException {
+    S3AInputStream s3AInputStream = getMockedS3AInputStream();
+
+    assertEquals("'a' from the test input stream 'ab' should be the first 
character being read",
+        input.charAt(0), s3AInputStream.read());
+    assertEquals("'b' from the test input stream 'ab' should be the second 
character being read",
+        input.charAt(1), s3AInputStream.read());
+  }
+
+  @Test
+  public void testInputStreamReadRetryLengthForException() throws IOException {
+    byte[] result = new byte[input.length()];
+    S3AInputStream s3AInputStream = getMockedS3AInputStream();
+    s3AInputStream.read(result, 0, input.length());
+
+    assertArrayEquals("The read result should equals to the test input stream 
content",
+        input.getBytes(), result);
+  }
+
+  private S3AInputStream getMockedS3AInputStream() {
+    Path path = new Path("test-path");
+    String eTag = "test-etag";
+    String versionId = "test-version-id";
+    String owner = "test-owner";
+
+    S3AFileStatus s3AFileStatus = new S3AFileStatus(
+        input.length(), 0, path, input.length(), owner, eTag, versionId);
+
+    S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
+        fs.getBucket(), path, fs.pathToKey(path), 
fs.getServerSideEncryptionAlgorithm(),
+        new EncryptionSecrets().getEncryptionKey(), eTag, versionId, 
input.length());
+
+    S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, 
S3AInputPolicy.Normal,
+        ChangeDetectionPolicy.getPolicy(fs.getConf()), 100, NoopSpan.INSTANCE);
+
+    return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, 
getMockedInputStreamCallback());
+  }
+
+  // Get mocked InputStreamCallbacks where we return mocked S3Object
+  private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() {
+    return new S3AInputStream.InputStreamCallbacks() {
+
+      final S3Object mockedS3Object = getMockedS3Object();
+
+      @Override
+      public S3Object getObject(GetObjectRequest request) {
+        // Set s3 client to return mocked s3object with already defined read 
behavior
+        return mockedS3Object;
+      }
+
+      @Override
+      public GetObjectRequest newGetRequest(String key) {
+        return fs.getRequestFactory().newGetObjectRequest(key);
+      }
+
+      @Override
+      public void close() {
+      }
+    };
+  }
+
+  // Get mocked S3Object where return bad input stream on the first couple of 
getObjectContent calls
+  private S3Object getMockedS3Object() {
+    S3ObjectInputStream objectInputStreamBad1 = getMockedInputStream(true);
+    S3ObjectInputStream objectInputStreamBad2 = getMockedInputStream(true);
+    S3ObjectInputStream objectInputStreamGood = getMockedInputStream(false);
+
+    return new S3Object() {
+      final S3ObjectInputStream[] inputStreams =
+          {objectInputStreamBad1, objectInputStreamBad2, 
objectInputStreamGood};
+
+      Integer inputStreamIndex = 0;
+
+      @Override
+      public S3ObjectInputStream getObjectContent() {
+        // Set getObjectContent behavior: returns bad stream twice, and good 
stream afterwards
+        inputStreamIndex++;
+        return inputStreams[min(inputStreamIndex, inputStreams.length) - 1];
+      }
+
+      @Override
+      public ObjectMetadata getObjectMetadata() {
+        // Set getObjectMetadata behavior: returns dummy metadata
+        ObjectMetadata metadata = new ObjectMetadata();
+        metadata.setHeader("ETag", "test-etag");
+        return metadata;
+      }
+    };
+  }
+
+  // Get mocked S3ObjectInputStream where we can trigger IOException to 
sumulate the read failure

Review comment:
       nit*: `sumulate` -> `simulate`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

            Worklog Id:     (was: 611897)
    Remaining Estimate: 0h
            Time Spent: 10m

> S3AInputStream read does not re-open the input stream on the second read 
> retry attempt
> --------------------------------------------------------------------------------------
>
>                 Key: HADOOP-17764
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17764
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/s3
>    Affects Versions: 3.3.1
>            Reporter: Zamil Majdy
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> *Bug description:*
> The read method in S3AInputStream has this following behaviour when an 
> IOException happening during the read:
>  * {{reopen and read quickly}}: The client after failing in the first attempt 
> of {{read}}, will reopen the stream and try reading again without {{sleep}}.
>  * {{reopen and wait for fixed duration}}: The client after failing in the 
> attempt of {{read}}, will reopen the stream, sleep for 
> {{fs.s3a.retry.interval}} milliseconds (defaults to 500 ms), and then try 
> reading from the stream.
> While doing the {{reopen and read quickly}} process, the subsequent read will 
> be retried without reopening the input stream in case of the second failure 
> happened. This leads to some of the bytes read being skipped which results to 
> corrupt/less data than required. 
>  
> *Scenario to reproduce:*
>  * Execute S3AInputStream `read()` or `read(b, off, len)`.
>  * The read failed and throws `Connection Reset` exception after reading some 
> data.
>  * The InputStream is re-opened and another `read()` or `read(b, off, len)` 
> is executed
>  * The read failed for the second time and throws `Connection Reset` 
> exception after reading some data.
>  * The InputStream is not re-opened and another `read()` or `read(b, off, 
> len)` is executed after sleep
>  * The read succeed, but it skips the first few bytes that has already been 
> read on the second failure.
>  
> *Proposed fix:*
> [https://github.com/apache/hadoop/pull/3109]
> Added the test that reproduces the issue along with the fix



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to