steveloughran commented on a change in pull request #3109: URL: https://github.com/apache/hadoop/pull/3109#discussion_r655400585
########## File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java ########## @@ -1439,10 +1439,13 @@ public S3Object getObject(GetObjectRequest request) { * using FS state as well as the status. * @param fileStatus file status. * @param seekPolicy input policy for this operation + * @param changePolicy change policy for this operation. * @param readAheadRange readahead value. + * @param auditSpan audit span. * @return a context for read and select operations. */ - private S3AReadOpContext createReadContext( + @VisibleForTesting + protected S3AReadOpContext createReadContext( Review comment: Afraid we currently do. However, if we move to a builder API for that ReadOpContext then the test could construct something very minimal (would only need the Invoker ref). I'd support that change here as it would help future tests. ########## File path: hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AInputStreamRetry.java ########## @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import javax.net.ssl.SSLException; +import java.io.IOException; +import java.net.SocketException; +import java.nio.charset.Charset; + +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.model.S3ObjectInputStream; +import org.junit.Test; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.s3a.audit.impl.NoopSpan; +import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets; +import org.apache.hadoop.fs.s3a.impl.ChangeDetectionPolicy; + +import static java.lang.Math.min; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +/** + * Tests S3AInputStream retry behavior on read failure. + * These tests are for validating expected behavior of retrying the S3AInputStream + * read() and read(b, off, len), it tests that the read should reopen the input stream and retry + * the read when IOException is thrown during the read process. + */ +public class TestS3AInputStreamRetry extends AbstractS3AMockTest { + + String input = "ab"; + + @Test + public void testInputStreamReadRetryForException() throws IOException { + S3AInputStream s3AInputStream = getMockedS3AInputStream(); + + assertEquals("'a' from the test input stream 'ab' should be the first character being read", + input.charAt(0), s3AInputStream.read()); + assertEquals("'b' from the test input stream 'ab' should be the second character being read", + input.charAt(1), s3AInputStream.read()); + } + + @Test + public void testInputStreamReadRetryLengthForException() throws IOException { + byte[] result = new byte[input.length()]; + S3AInputStream s3AInputStream = getMockedS3AInputStream(); + s3AInputStream.read(result, 0, input.length()); + + assertArrayEquals("The read result should equals to the test input stream content", + input.getBytes(), result); + } + + private S3AInputStream getMockedS3AInputStream() { + Path path = new Path("test-path"); + String eTag = "test-etag"; + String versionId = "test-version-id"; + String owner = "test-owner"; + + S3AFileStatus s3AFileStatus = new S3AFileStatus( + input.length(), 0, path, input.length(), owner, eTag, versionId); + + S3ObjectAttributes s3ObjectAttributes = new S3ObjectAttributes( + fs.getBucket(), path, fs.pathToKey(path), fs.getServerSideEncryptionAlgorithm(), + new EncryptionSecrets().getEncryptionKey(), eTag, versionId, input.length()); + + S3AReadOpContext s3AReadOpContext = fs.createReadContext(s3AFileStatus, S3AInputPolicy.Normal, + ChangeDetectionPolicy.getPolicy(fs.getConf()), 100, NoopSpan.INSTANCE); + + return new S3AInputStream(s3AReadOpContext, s3ObjectAttributes, getMockedInputStreamCallback()); + } + + // Get mocked InputStreamCallbacks where we return mocked S3Object + private S3AInputStream.InputStreamCallbacks getMockedInputStreamCallback() { + return new S3AInputStream.InputStreamCallbacks() { + + final S3Object mockedS3Object = getMockedS3Object(); + + @Override + public S3Object getObject(GetObjectRequest request) { + // Set s3 client to return mocked s3object with already defined read behavior + return mockedS3Object; + } + + @Override + public GetObjectRequest newGetRequest(String key) { + return fs.getRequestFactory().newGetObjectRequest(key); Review comment: or just return `new GetObjectRequest(bucket, key)`; there's no need to add extra attributes to these requests. ########## File path: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java ########## @@ -396,6 +396,41 @@ private void incrementBytesRead(long bytesRead) { } } + @FunctionalInterface + interface CheckedIntSupplier { + int get() throws IOException; + } + + /** + * Helper function that allows to retry an IntSupplier in case of `IOException`. + * This function is used by `read()` and `read(buf, off, len)` functions. It tries to run + * `readFn` and in case of `IOException`: + * 1. If it gets an EOFException, return -1 + * 2. Else, run `onReadFailure` and retry running `readFn`. If it fails again, + * we run `onReadFailure` and re-throw the error. + * @param readFn the function to read, it must return an integer + * @param length length of data being attempted to read + * @return -1 if `readFn` throws EOFException, else returns int value from the result of `readFn` + * @throws IOException if retry of `readFn` also fails with `IOException` + */ + private int retryReadOnce(CheckedIntSupplier readFn, int length) throws IOException { + try { + return readFn.get(); + } catch (EOFException e) { + return -1; + } catch (IOException e) { + onReadFailure(e, length, e instanceof SocketTimeoutException); Review comment: @bogthe see `calculateRequestLimit()`; its only part of the information used to calculate the range. on random IO the range will be that of fs.s3a.readahead (unless the file is shorter). It's more important in `read(buffer[])` as then the full buffer length is requested; if something is asking for a 2MB buffer then that's what it gets. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org