[ 
https://issues.apache.org/jira/browse/HADOOP-17764?focusedWorklogId=612637&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-612637
 ]

ASF GitHub Bot logged work on HADOOP-17764:
-------------------------------------------

                Author: ASF GitHub Bot
            Created on: 21/Jun/21 13:57
            Start Date: 21/Jun/21 13:57
    Worklog Time Spent: 10m 
      Work Description: steveloughran commented on a change in pull request 
#3109:
URL: https://github.com/apache/hadoop/pull/3109#discussion_r655400585



##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java
##########
@@ -1439,10 +1439,13 @@ public S3Object getObject(GetObjectRequest request) {
    * using FS state as well as the status.
    * @param fileStatus file status.
    * @param seekPolicy input policy for this operation
+   * @param changePolicy change policy for this operation.
    * @param readAheadRange readahead value.
+   * @param auditSpan audit span.
    * @return a context for read and select operations.
    */
-  private S3AReadOpContext createReadContext(
+  @VisibleForTesting
+  protected S3AReadOpContext createReadContext(

Review comment:
       Afraid we currently do. However, if we move to a builder API for that 
ReadOpContext then the test could construct something very minimal (would only 
need the Invoker ref). I'd support that change here as it would help future 
tests.
   

##########
File path: 
hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import javax.net.ssl.SSLException;
+import java.io.IOException;
+import java.net.SocketException;
+import java.nio.charset.Charset;
+
+import com.amazonaws.services.s3.model.GetObjectRequest;
+import com.amazonaws.services.s3.model.ObjectMetadata;
+import com.amazonaws.services.s3.model.S3Object;
+import com.amazonaws.services.s3.model.S3ObjectInputStream;
+import org.junit.Test;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan;
+import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
+import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy;
+
+import static java.lang.Math.min;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests S3AInputStream retry behavior on read failure.
+ * These tests are for validating expected behavior of retrying the 
S3AInputStream
+ * read() and read(b, off, len), it tests that the read should reopen the 
input stream and retry
+ * the read when IOException is thrown during the read process.
+ */
+public class TestS3AInputStreamRetry extends AbstractS3AMockTest {
+
+  String input = "ab";
+
+  @Test
+  public void testInputStreamReadRetryForException() throws IOException {
+    S3AInputStream s3AInputStream = getMockedS3AInputStream();
+
+    assertEquals("'a' from the test input stream 'ab' should be the first 
character being read",
+        input.charAt(0), s3AInputStream.read());
+    assertEquals("'b' from the test input stream 'ab' should be the second 
character being read",
+        input.charAt(1), s3AInputStream.read());
+  }
+
+  @Test
+  public void testInputStreamReadRetryLengthForException() throws IOException {
+    byte[] result = new byte[input.length()];
+    S3AInputStream s3AInputStream = getMockedS3AInputStream();
+    s3AInputStream.read(result, 0, input.length());
+
+    assertArrayEquals("The read result should equals to the test input stream 
content",
+        input.getBytes(), result);
+  }
+
+  private S3AInputStream getMockedS3AInputStream() {
+    Path path = new Path("test-path");
+    String eTag = "test-etag";
+    String versionId = "test-version-id";
+    String owner = "test-owner";
+
+    S3AFileStatus s3AFileStatus = new S3AFileStatus(
+        input.length(), 0, path, input.length(), owner, eTag, versionId);
+
+    S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes(
+        fs.getBucket(), path, fs.pathToKey(path), 
fs.getServerSideEncryptionAlgorithm(),
+        new EncryptionSecrets().getEncryptionKey(), eTag, versionId, 
input.length());
+
+    S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, 
S3AInputPolicy.Normal,
+        ChangeDetectionPolicy.getPolicy(fs.getConf()), 100, NoopSpan.INSTANCE);
+
+    return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, 
getMockedInputStreamCallback());
+  }
+
+  // Get mocked InputStreamCallbacks where we return mocked S3Object
+  private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() {
+    return new S3AInputStream.InputStreamCallbacks() {
+
+      final S3Object mockedS3Object = getMockedS3Object();
+
+      @Override
+      public S3Object getObject(GetObjectRequest request) {
+        // Set s3 client to return mocked s3object with already defined read 
behavior
+        return mockedS3Object;
+      }
+
+      @Override
+      public GetObjectRequest newGetRequest(String key) {
+        return fs.getRequestFactory().newGetObjectRequest(key);

Review comment:
       or just return `new GetObjectRequest(bucket, key)`; there's no need to 
add extra attributes to these requests.

##########
File path: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
##########
@@ -396,6 +396,41 @@ private void incrementBytesRead(long bytesRead) {
     }
   }
 
+  @FunctionalInterface
+  interface CheckedIntSupplier {
+    int get() throws IOException;
+  }
+
+  /**
+   * Helper function that allows to retry an IntSupplier in case of 
`IOException`.
+   * This function is used by `read()` and `read(buf, off, len)` functions. It 
tries to run
+   * `readFn` and in case of `IOException`:
+   *   1. If it gets an EOFException, return -1
+   *   2. Else, run `onReadFailure` and retry running `readFn`. If it fails 
again,
+   *   we run `onReadFailure` and re-throw the error.
+   * @param readFn the function to read, it must return an integer
+   * @param length length of data being attempted to read
+   * @return -1 if `readFn` throws EOFException, else returns int value from 
the result of `readFn`
+   * @throws IOException if retry of `readFn` also fails with `IOException`
+   */
+  private int retryReadOnce(CheckedIntSupplier readFn, int length) throws 
IOException {
+    try {
+      return readFn.get();
+    } catch (EOFException e) {
+      return -1;
+    } catch (IOException e) {
+      onReadFailure(e, length, e instanceof SocketTimeoutException);

Review comment:
       @bogthe see `calculateRequestLimit()`; its only part of the information 
used to calculate the range. on random IO the range will be that of 
fs.s3a.readahead (unless the file is shorter). It's more important  in 
`read(buffer[])` as then the full buffer length is requested; if something is 
asking for a 2MB buffer then that's what it gets. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 612637)
    Time Spent: 1h 50m  (was: 1h 40m)

> S3AInputStream read does not re-open the input stream on the second read 
> retry attempt
> --------------------------------------------------------------------------------------
>
>                 Key: HADOOP-17764
>                 URL: https://issues.apache.org/jira/browse/HADOOP-17764
>             Project: Hadoop Common
>          Issue Type: Bug
>          Components: fs/s3
>    Affects Versions: 3.3.1
>            Reporter: Zamil Majdy
>            Priority: Major
>              Labels: pull-request-available
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> *Bug description:*
> The read method in S3AInputStream has this following behaviour when an 
> IOException happening during the read:
>  * {{reopen and read quickly}}: The client after failing in the first attempt 
> of {{read}}, will reopen the stream and try reading again without {{sleep}}.
>  * {{reopen and wait for fixed duration}}: The client after failing in the 
> attempt of {{read}}, will reopen the stream, sleep for 
> {{fs.s3a.retry.interval}} milliseconds (defaults to 500 ms), and then try 
> reading from the stream.
> While doing the {{reopen and read quickly}} process, the subsequent read will 
> be retried without reopening the input stream in case of the second failure 
> happened. This leads to some of the bytes read being skipped which results to 
> corrupt/less data than required. 
>  
> *Scenario to reproduce:*
>  * Execute S3AInputStream `read()` or `read(b, off, len)`.
>  * The read failed and throws `Connection Reset` exception after reading some 
> data.
>  * The InputStream is re-opened and another `read()` or `read(b, off, len)` 
> is executed
>  * The read failed for the second time and throws `Connection Reset` 
> exception after reading some data.
>  * The InputStream is not re-opened and another `read()` or `read(b, off, 
> len)` is executed after sleep
>  * The read succeed, but it skips the first few bytes that has already been 
> read on the second failure.
>  
> *Proposed fix:*
> [https://github.com/apache/hadoop/pull/3109]
> Added the test that reproduces the issue along with the fix



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to