This is an automated email from the ASF dual-hosted git repository.

stevel pushed a commit to branch branch-3.3
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.3 by this push:
     new 9c767a350d4 HADOOP-19027. S3A: S3AInputStream doesn't recover from 
channel exceptions (#6425) (#8003)
9c767a350d4 is described below

commit 9c767a350d46bede64af093838609bb8afc28545
Author: Steve Loughran <[email protected]>
AuthorDate: Mon Oct 6 16:09:13 2025 +0100

    HADOOP-19027. S3A: S3AInputStream doesn't recover from channel exceptions 
(#6425) (#8003)
    
    
    This is a fraction of #6425 backported to the V1 SDK, which does a lot of
    V2-specific translation/unwinding.
    
    416 responses are mapped to RangeNotSatisfiableEOFException,
    whose retry policy is: fail.
    
    Classic EOFException is now retried as connection failure.
    calls to read() and lazyseek all retry on this with full retry policy,
    including handling of socket errors.
    
    Contributed by Steve Loughran
---
 .../fs/s3a/RangeNotSatisfiableEOFException.java    |  39 ++++
 .../org/apache/hadoop/fs/s3a/S3AInputStream.java   |  58 ++++--
 .../org/apache/hadoop/fs/s3a/S3ARetryPolicy.java   |  21 ++-
 .../java/org/apache/hadoop/fs/s3a/S3AUtils.java    |  13 +-
 .../hadoop/fs/s3a/impl/InternalConstants.java      |  64 ++++++-
 .../hadoop/fs/s3a/TestS3AExceptionTranslation.java |  13 +-
 .../fs/s3a/performance/ITestS3AOpenCost.java       | 207 +++++++++++++++++----
 7 files changed, 353 insertions(+), 62 deletions(-)

diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java
new file mode 100644
index 00000000000..4c6b9decb0b
--- /dev/null
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/RangeNotSatisfiableEOFException.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a;
+
+import java.io.EOFException;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+
+/**
+ * Status code 416, range not satisfiable.
+ * Subclass of {@link EOFException} so that any code which expects that to
+ * be the outcome of a 416 failure will continue to work.
+ */
[email protected]
+public class RangeNotSatisfiableEOFException extends EOFException {
+
+  public RangeNotSatisfiableEOFException(
+      String operation,
+      Exception cause) {
+    super(operation);
+    initCause(cause);
+  }
+}
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
index d87bb7fc946..d8ba36500f7 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java
@@ -99,6 +99,14 @@ public class S3AInputStream extends FSInputStream implements 
 CanSetReadahead,
   public static final String OPERATION_OPEN = "open";
   public static final String OPERATION_REOPEN = "re-open";
 
+  /**
+   * Switch for behavior on when wrappedStream.read()
+   * returns -1 or raises an EOF; the original semantics
+   * are that the stream is kept open.
+   * Value {@value}.
+   */
+  private static final boolean CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ = true;
+
   /**
    * This is the maximum temporary buffer size we use while
    * populating the data in direct byte buffers during a vectored IO
@@ -435,16 +443,23 @@ public boolean seekToNewSource(long targetPos) throws 
IOException {
 
   /**
    * Perform lazy seek and adjust stream to correct position for reading.
-   *
+   * If an EOF Exception is raised there are two possibilities
+   * <ol>
+   *   <li>the stream is at the end of the file</li>
+   *   <li>something went wrong with the network connection</li>
+   * </ol>
+   * This method does not attempt to distinguish; it assumes that an EOF
+   * exception is always "end of file".
    * @param targetPos position from where data should be read
    * @param len length of the content that needs to be read
+   * @throws RangeNotSatisfiableEOFException GET is out of range
+   * @throws IOException anything else.
    */
   @Retries.RetryTranslated
   private void lazySeek(long targetPos, long len) throws IOException {
 
     Invoker invoker = context.getReadInvoker();
-    invoker.maybeRetry(streamStatistics.getOpenOperations() == 0,
-        "lazySeek", pathStr, true,
+    invoker.retry("lazySeek to " + targetPos, pathStr, true,
         () -> {
           //For lazy seek
           seekInStream(targetPos, len);
@@ -478,7 +493,9 @@ public synchronized int read() throws IOException {
 
     try {
       lazySeek(nextReadPos, 1);
-    } catch (EOFException e) {
+    } catch (RangeNotSatisfiableEOFException e) {
+      // attempt to GET beyond the end of the object
+      LOG.debug("Downgrading 416 response attempt to read at {} to -1 
response", nextReadPos);
       return -1;
     }
 
@@ -494,8 +511,6 @@ public synchronized int read() throws IOException {
           }
           try {
             b = wrappedStream.read();
-          } catch (EOFException e) {
-            return -1;
           } catch (SocketTimeoutException e) {
             onReadFailure(e, true);
             throw e;
@@ -509,10 +524,9 @@ public synchronized int read() throws IOException {
     if (byteRead >= 0) {
       pos++;
       nextReadPos++;
-    }
-
-    if (byteRead >= 0) {
       incrementBytesRead(1);
+    } else {
+      streamReadResultNegative();
     }
     return byteRead;
   }
@@ -537,6 +551,18 @@ private void onReadFailure(IOException ioe, boolean 
forceAbort) {
     closeStream("failure recovery", forceAbort, false);
   }
 
+  /**
+   * the read() call returned -1.
+   * this means "the connection has gone past the end of the object" or
+   * the stream has broken for some reason.
+   * so close stream (without an abort).
+   */
+  private void streamReadResultNegative() {
+    if (CLOSE_WRAPPED_STREAM_ON_NEGATIVE_READ) {
+      closeStream("wrappedStream.read() returned -1", false, false);
+    }
+  }
+
   /**
    * {@inheritDoc}
    *
@@ -562,8 +588,8 @@ public synchronized int read(byte[] buf, int off, int len)
 
     try {
       lazySeek(nextReadPos, len);
-    } catch (EOFException e) {
-      // the end of the file has moved
+    } catch (RangeNotSatisfiableEOFException e) {
+      // attempt to GET beyond the end of the object
       return -1;
     }
 
@@ -581,12 +607,12 @@ public synchronized int read(byte[] buf, int off, int len)
           }
           try {
             bytes = wrappedStream.read(buf, off, len);
-          } catch (EOFException e) {
-            // the base implementation swallows EOFs.
-            return -1;
           } catch (SocketTimeoutException e) {
             onReadFailure(e, true);
             throw e;
+          } catch (EOFException e) {
+            // the base implementation swallows EOFs.
+            return -1;
           } catch (IOException e) {
             onReadFailure(e, false);
             throw e;
@@ -597,8 +623,10 @@ public synchronized int read(byte[] buf, int off, int len)
     if (bytesRead > 0) {
       pos += bytesRead;
       nextReadPos += bytesRead;
+      incrementBytesRead(bytesRead);
+    } else {
+      streamReadResultNegative();
     }
-    incrementBytesRead(bytesRead);
     streamStatistics.readOperationCompleted(len, bytesRead);
     return bytesRead;
   }
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
index 528a99f5e09..a648421d504 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ARetryPolicy.java
@@ -22,7 +22,10 @@
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InterruptedIOException;
+import java.net.BindException;
+import java.net.ConnectException;
 import java.net.NoRouteToHostException;
+import java.net.SocketException;
 import java.net.SocketTimeoutException;
 import java.net.UnknownHostException;
 import java.nio.file.AccessDeniedException;
@@ -197,6 +200,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> 
createExceptionMap() {
     // implementation
     policyMap.put(NoVersionAttributeException.class, fail);
 
+    // range header is out of scope of object; retrying won't help
+    policyMap.put(RangeNotSatisfiableEOFException.class, fail);
+
     // should really be handled by resubmitting to new location;
     // that's beyond the scope of this retry policy
     policyMap.put(AWSRedirectException.class, fail);
@@ -204,14 +210,17 @@ protected Map<Class<? extends Exception>, RetryPolicy> 
createExceptionMap() {
     // throttled requests are can be retried, always
     policyMap.put(AWSServiceThrottledException.class, throttlePolicy);
 
+    // socket exception subclass we consider unrecoverable
+    // though this is normally only found when opening a port for listening.
+    // which is never done in S3A.
+    policyMap.put(BindException.class, fail);
+
     // connectivity problems are retried without worrying about idempotency
     policyMap.put(ConnectTimeoutException.class, connectivityFailure);
+    policyMap.put(ConnectException.class, connectivityFailure);
 
     // this can be a sign of an HTTP connection breaking early.
-    // which can be reacted to by another attempt if the request was 
idempotent.
-    // But: could also be a sign of trying to read past the EOF on a GET,
-    // which isn't going to be recovered from
-    policyMap.put(EOFException.class, retryIdempotentCalls);
+    policyMap.put(EOFException.class, connectivityFailure);
 
     // policy on a 400/bad request still ambiguous.
     // Treated as an immediate failure
@@ -227,7 +236,9 @@ protected Map<Class<? extends Exception>, RetryPolicy> 
createExceptionMap() {
     policyMap.put(AWSClientIOException.class, retryIdempotentCalls);
     policyMap.put(AWSServiceIOException.class, retryIdempotentCalls);
     policyMap.put(AWSS3IOException.class, retryIdempotentCalls);
-    policyMap.put(SocketTimeoutException.class, retryIdempotentCalls);
+    // general socket exceptions
+    policyMap.put(SocketException.class, connectivityFailure);
+    policyMap.put(SocketTimeoutException.class, connectivityFailure);
 
     // Unsupported requests do not work, however many times you try
     policyMap.put(UnsupportedRequestException.class, fail);
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
index 590b0b55ac4..1c2212d76ff 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AUtils.java
@@ -89,7 +89,7 @@
 import static org.apache.commons.lang3.StringUtils.isEmpty;
 import static org.apache.hadoop.fs.s3a.Constants.*;
 import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
-import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.CSE_PADDING_LENGTH;
+import static org.apache.hadoop.fs.s3a.impl.InternalConstants.*;
 import static 
org.apache.hadoop.fs.s3a.impl.MultiObjectDeleteSupport.translateDeleteException;
 import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
 import static 
org.apache.hadoop.util.functional.RemoteIterators.filteringRemoteIterator;
@@ -280,10 +280,13 @@ public static IOException translateException(@Nullable 
String operation,
         break;
 
       // out of range. This may happen if an object is overwritten with
-      // a shorter one while it is being read.
-      case 416:
-        ioe = new EOFException(message);
-        ioe.initCause(ase);
+      // a shorter one while it is being read or openFile() was invoked
+      // passing a FileStatus or file length less than that of the object.
+      // although the HTTP specification says that the response should
+      // include a range header specifying the actual range available,
+      // this isn't picked up here.
+      case SC_416_RANGE_NOT_SATISFIABLE:
+        ioe = new RangeNotSatisfiableEOFException(message, ase);
         break;
 
       // this has surfaced as a "no response from server" message.
diff --git 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
index 2e74fbd5241..31f3a68a625 100644
--- 
a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
+++ 
b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/InternalConstants.java
@@ -108,11 +108,71 @@ private InternalConstants() {
     S3A_OPENFILE_KEYS = Collections.unmodifiableSet(keys);
   }
 
+  /** 200 status code: OK. */
+  public static final int SC_200_OK = 200;
+
+  /** 301 status code: Moved Permanently. */
+  public static final int SC_301_MOVED_PERMANENTLY = 301;
+
+  /** 307 status code: Temporary Redirect. */
+  public static final int SC_307_TEMPORARY_REDIRECT = 307;
+
+  /** 400 status code: Bad Request. */
+  public static final int SC_400_BAD_REQUEST = 400;
+
+  /** 401 status code: Unauthorized. */
+  public static final int SC_401_UNAUTHORIZED = 401;
+
+  /** 403 status code: Forbidden. */
+  public static final int SC_403_FORBIDDEN = 403;
+
   /** 403 error code. */
-  public static final int SC_403 = 403;
+  public static final int SC_403 = SC_403_FORBIDDEN;
+
+  /** 404 status code: Not Found. */
+  public static final int SC_404_NOT_FOUND = 404;
 
   /** 404 error code. */
-  public static final int SC_404 = 404;
+  public static final int SC_404 = SC_404_NOT_FOUND;
+
+  /** 405 status code: Method Not Allowed. */
+  public static final int SC_405_METHOD_NOT_ALLOWED = 405;
+
+  /** 409 status code: Conflict. Example: creating a bucket twice. */
+  public static final int SC_409_CONFLICT = 409;
+
+  /** 410 status code: Gone. */
+  public static final int SC_410_GONE = 410;
+
+  /** 412 status code: Precondition Failed. */
+  public static final int SC_412_PRECONDITION_FAILED = 412;
+
+  /** 415 status code: Content type unsupported by this store. */
+  public static final int SC_415_UNSUPPORTED_MEDIA_TYPE = 415;
+
+  /** 416 status code: Range Not Satisfiable. */
+  public static final int SC_416_RANGE_NOT_SATISFIABLE = 416;
+
+  /** 429 status code: This is the google GCS throttle message. */
+  public static final int SC_429_TOO_MANY_REQUESTS_GCS = 429;
+
+  /** 443 status code: No Response (unofficial). */
+  public static final int SC_443_NO_RESPONSE = 443;
+
+  /** 444 status code: No Response (unofficial). */
+  public static final int SC_444_NO_RESPONSE = 444;
+
+  /** 500 status code: Internal Server Error. */
+  public static final int SC_500_INTERNAL_SERVER_ERROR = 500;
+
+  /** 501 status code: method not implemented. */
+  public static final int SC_501_NOT_IMPLEMENTED = 501;
+
+  /** 503 status code: Service Unavailable. on AWS S3: throttle response. */
+  public static final int SC_503_SERVICE_UNAVAILABLE = 503;
+
+  /** 504 Gateway Timeout. AWS SDK considers retryable. */
+  public static final int SC_504_GATEWAY_TIMEOUT = 504;
 
   /** Name of the log for throttling events. Value: {@value}. */
   public static final String THROTTLE_LOG_NAME =
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
index fd649c436bf..4e3382f2934 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/TestS3AExceptionTranslation.java
@@ -22,6 +22,7 @@
 import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
 import static org.apache.hadoop.fs.s3a.S3AUtils.*;
 import static org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_404;
+import static 
org.apache.hadoop.fs.s3a.impl.InternalConstants.SC_416_RANGE_NOT_SATISFIABLE;
 import static org.junit.Assert.*;
 
 import java.io.EOFException;
@@ -38,6 +39,7 @@
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.services.s3.model.AmazonS3Exception;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 
 import org.apache.hadoop.fs.s3a.impl.ErrorTranslation;
@@ -80,10 +82,10 @@ protected void assertContained(String text, String 
contained) {
         text != null && text.contains(contained));
   }
 
-  protected <E extends Throwable> void verifyTranslated(
+  protected <E extends Throwable> E verifyTranslated(
       int status,
       Class<E> expected) throws Exception {
-    verifyTranslated(expected, createS3Exception(status));
+    return verifyTranslated(expected, createS3Exception(status));
   }
 
   @Test
@@ -128,7 +130,12 @@ public void test410isNotFound() throws Exception {
 
   @Test
   public void test416isEOF() throws Exception {
-    verifyTranslated(416, EOFException.class);
+
+    // 416 maps the the subclass of EOFException
+    final IOException ex = verifyTranslated(SC_416_RANGE_NOT_SATISFIABLE,
+            RangeNotSatisfiableEOFException.class);
+    Assertions.assertThat(ex)
+        .isInstanceOf(EOFException.class);
   }
 
   @Test
diff --git 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
index 4aae84dca8e..cd30b80b855 100644
--- 
a/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
+++ 
b/hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/performance/ITestS3AOpenCost.java
@@ -21,6 +21,7 @@
 
 import java.io.EOFException;
 
+import org.assertj.core.api.Assertions;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -30,9 +31,11 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.s3a.S3AFileSystem;
 import org.apache.hadoop.fs.s3a.S3ATestUtils;
+import org.apache.hadoop.fs.s3a.Statistic;
 import org.apache.hadoop.fs.statistics.IOStatistics;
 
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
+import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_RANDOM;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL;
 import static 
org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_LENGTH;
 import static org.apache.hadoop.fs.contract.ContractTestUtils.readStream;
@@ -41,10 +44,12 @@
 import static org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_OPENED;
 import static 
org.apache.hadoop.fs.s3a.Statistic.STREAM_READ_SEEK_BYTES_SKIPPED;
 import static 
org.apache.hadoop.fs.s3a.performance.OperationCost.NO_HEAD_OR_LIST;
+import static 
org.apache.hadoop.fs.s3a.performance.OperationCostValidator.probe;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.assertDurationRange;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.extractStatistics;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
 import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.demandStringifyIOStatistics;
+import static 
org.apache.hadoop.fs.statistics.IOStatisticsLogging.ioStatisticsToPrettyString;
 import static 
org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_FILE_OPENED;
 import static org.apache.hadoop.test.LambdaTestUtils.intercept;
 
@@ -56,11 +61,13 @@ public class ITestS3AOpenCost extends AbstractS3ACostTest {
   private static final Logger LOG =
       LoggerFactory.getLogger(ITestS3AOpenCost.class);
 
+  public static final String TEXT = "0123456789ABCDEF";
+
   private Path testFile;
 
   private FileStatus testFileStatus;
 
-  private long fileLength;
+  private int fileLength;
 
   public ITestS3AOpenCost() {
     super(true);
@@ -76,9 +83,9 @@ public void setup() throws Exception {
     S3AFileSystem fs = getFileSystem();
     testFile = methodPath();
 
-    writeTextFile(fs, testFile, "openfile", true);
+    writeTextFile(fs, testFile, TEXT, true);
     testFileStatus = fs.getFileStatus(testFile);
-    fileLength = testFileStatus.getLen();
+    fileLength = (int)testFileStatus.getLen();
   }
 
   /**
@@ -137,15 +144,8 @@ public void testOpenFileShorterLength() throws Throwable {
     int offset = 2;
     long shortLen = fileLength - offset;
     // open the file
-    FSDataInputStream in2 = verifyMetrics(() ->
-            fs.openFile(testFile)
-                .must(FS_OPTION_OPENFILE_READ_POLICY,
-                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
-                .mustLong(FS_OPTION_OPENFILE_LENGTH, shortLen)
-                .build()
-                .get(),
-        always(NO_HEAD_OR_LIST),
-        with(STREAM_READ_OPENED, 0));
+    FSDataInputStream in2 = openFile(shortLen,
+            FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL);
 
     // verify that the statistics are in range
     IOStatistics ioStatistics = extractStatistics(in2);
@@ -171,39 +171,182 @@ public void testOpenFileShorterLength() throws Throwable 
{
   }
 
   @Test
-  public void testOpenFileLongerLength() throws Throwable {
-    // do a second read with the length declared as longer
+  public void testOpenFileLongerLengthReadFully() throws Throwable {
+    // do a read with the length declared as longer
     // than it is.
     // An EOF will be read on readFully(), -1 on a read()
 
+    final int extra = 10;
+    long longLen = fileLength + extra;
+
+
+    // assert behaviors of seeking/reading past the file length.
+    // there is no attempt at recovery.
+    verifyMetrics(() -> {
+      try (FSDataInputStream in = openFile(longLen,
+          FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+        byte[] out = new byte[(int) (longLen)];
+        intercept(EOFException.class, () -> {
+          in.readFully(0, out);
+          return in;
+        });
+        in.seek(longLen - 1);
+        assertEquals("read past real EOF on " + in, -1, in.read());
+        return in.toString();
+      }
+    },
+        always(NO_HEAD_OR_LIST),
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+        probe(true, STREAM_READ_OPENED, 1 + 1));
+
+    // now on a new stream, try a full read from after the EOF
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)) {
+        byte[] out = new byte[extra];
+        intercept(EOFException.class, () -> in.readFully(fileLength, out));
+        return in.toString();
+      }
+    },
+        // two GET calls were made, one for readFully,
+        // the second on the read() past the EOF
+        // the operation has got as far as S3
+
+        with(STREAM_READ_OPENED, 1));
+  }
+
+  /**
+   * Open a file.
+   * @param longLen length to declare
+   * @param policy read policy
+   * @return file handle
+   */
+  private FSDataInputStream openFile(final long longLen, String policy)
+      throws Exception {
     S3AFileSystem fs = getFileSystem();
     // set a length past the actual file length
-    long longLen = fileLength + 10;
-    FSDataInputStream in3 = verifyMetrics(() ->
+    return verifyMetrics(() ->
             fs.openFile(testFile)
-                .must(FS_OPTION_OPENFILE_READ_POLICY,
-                    FS_OPTION_OPENFILE_READ_POLICY_SEQUENTIAL)
+                .must(FS_OPTION_OPENFILE_READ_POLICY, policy)
                 .mustLong(FS_OPTION_OPENFILE_LENGTH, longLen)
                 .build()
                 .get(),
         always(NO_HEAD_OR_LIST));
+  }
+
+  /**
+   * Open a file with a length declared as longer than the actual file length.
+   * Validate input stream.read() semantics.
+   */
+  @Test
+  public void testReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    describe("read() up to the end of the real file");
+
+    final int extra = 10;
+    int longLen = fileLength + extra;
+    try (FSDataInputStream in = openFile(longLen,
+        FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+      for (int i = 0; i < fileLength; i++) {
+        Assertions.assertThat(in.read())
+            .describedAs("read() at %d from stream %s", i, in)
+            .isEqualTo(TEXT.charAt(i));
+      }
+      LOG.info("Statistics after EOF {}", 
ioStatisticsToPrettyString(in.getIOStatistics()));
+    }
+
+    // now open and read after the EOF; this is
+    // expected to return -1 on each read; there's a GET per call.
+    // as the counters are updated on close(), the stream must be closed
+    // within the verification clause.
+    // note how there's no attempt to alter file expected length...
+    // instead the call always goes to S3.
+    // there's no information in the exception from the SDK
+    describe("reading past the end of the file");
 
-    // assert behaviors of seeking/reading past the file length.
-    // there is no attempt at recovery.
     verifyMetrics(() -> {
-      byte[] out = new byte[(int) longLen];
-      intercept(EOFException.class,
-          () -> in3.readFully(0, out));
-      in3.seek(longLen - 1);
-      assertEquals("read past real EOF on " + in3,
-          -1, in3.read());
-      in3.close();
-      return in3.toString();
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        for (int i = 0; i < extra; i++) {
+          final int p = fileLength + i;
+          in.seek(p);
+          Assertions.assertThat(in.read())
+              .describedAs("read() at %d", p)
+              .isEqualTo(-1);
+        }
+        LOG.info("Statistics after EOF {}", 
ioStatisticsToPrettyString(in.getIOStatistics()));
+        return in.toString();
+      }
     },
-        // two GET calls were made, one for readFully,
-        // the second on the read() past the EOF
-        // the operation has got as far as S3
-        with(STREAM_READ_OPENED, 2));
+        always(NO_HEAD_OR_LIST),
+        probe(Statistic.ACTION_HTTP_GET_REQUEST, extra));
+  }
 
+  /**
+   * Test {@code PositionedReadable.readFully()} past EOF in a file.
+   */
+  @Test
+  public void testPositionedReadableReadFullyPastEOF() throws Throwable {
+    // now, next corner case. Do a readFully() of more bytes than the file 
length.
+    // we expect failure.
+    // this codepath does a GET to the end of the (expected) file length, and 
when
+    // that GET returns -1 from the read because the bytes returned is less 
than
+    // expected then the readFully call fails.
+    describe("PositionedReadable.readFully() past the end of the file");
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        byte[] buf = new byte[(int) (longLen + 1)];
+        // readFully will fail
+        intercept(EOFException.class, () -> {
+          in.readFully(0, buf);
+          return in;
+        });
+        return "readFully past EOF with statistics"
+            + ioStatisticsToPrettyString(in.getIOStatistics());
+      }
+    },
+        always(NO_HEAD_OR_LIST),
+        probe(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
+  }
+
+  /**
+   * Test {@code PositionedReadable.read()} past EOF in a file.
+   */
+  @Test
+  public void testPositionedReadableReadPastEOF() throws Throwable {
+
+    // set a length past the actual file length
+    final int extra = 10;
+    int longLen = fileLength + extra;
+
+    describe("PositionedReadable.read() past the end of the file");
+    verifyMetrics(() -> {
+      try (FSDataInputStream in =
+               openFile(longLen, FS_OPTION_OPENFILE_READ_POLICY_RANDOM)) {
+        byte[] buf = new byte[(int) (longLen + 1)];
+
+        // readFully will read to the end of the file
+        Assertions.assertThat(in.read(0, buf, 0, buf.length))
+            .isEqualTo(fileLength);
+
+        // now attempt to read after EOF
+        Assertions.assertThat(in.read(fileLength, buf, 0, buf.length))
+            .describedAs("PositionedReadable.read() past EOF")
+            .isEqualTo(-1);
+        // stream is closed as part of this failure
+
+        return "PositionedReadable.read()) past EOF with " + in;
+      }
+    },
+        always(NO_HEAD_OR_LIST),
+        probe(Statistic.ACTION_HTTP_GET_REQUEST, 1)); // no attempt to re-open
   }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to