This is an automated email from the ASF dual-hosted git repository. stevel pushed a commit to branch branch-3.3.5 in repository https://gitbox.apache.org/repos/asf/hadoop.git
commit 2aa77a75f9ee59bdff0d03fc1eb65eaab72e353b Author: Mukund Thakur <mtha...@cloudera.com> AuthorDate: Mon Oct 10 15:47:45 2022 +0530 HADOOP-18460. checkIfVectoredIOStopped before populating the buffers (#4986) Contributed by Mukund Thakur --- .../org/apache/hadoop/fs/s3a/S3AInputStream.java | 43 ++++++++++++++-------- 1 file changed, 28 insertions(+), 15 deletions(-) diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 39d41f5ffd2..be5b1799b35 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -910,21 +910,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange, IntFunction<ByteBuffer> allocate) { LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr); - // This reference is must be kept till all buffers are populated as this is a + // This reference must be kept till all buffers are populated as this is a // finalizable object which closes the internal stream when gc triggers. S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { - checkIfVectoredIOStopped(); - final String operationName = "readCombinedFileRange"; - objectRange = getS3Object(operationName, + objectRange = getS3ObjectAndValidateNotNull("readCombinedFileRange", combinedFileRange.getOffset(), combinedFileRange.getLength()); objectContent = objectRange.getObjectContent(); - if (objectContent == null) { - throw new PathIOException(uri, - "Null IO stream received during " + operationName); - } populateChildBuffers(combinedFileRange, objectContent, allocate); } catch (Exception ex) { LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex); @@ -1019,19 +1013,15 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, */ private void readSingleRange(FileRange range, ByteBuffer buffer) { LOG.debug("Start reading range {} from path {} ", range, pathStr); + // This reference must be kept till all buffers are populated as this is a + // finalizable object which closes the internal stream when gc triggers. S3Object objectRange = null; S3ObjectInputStream objectContent = null; try { - checkIfVectoredIOStopped(); long position = range.getOffset(); int length = range.getLength(); - final String operationName = "readRange"; - objectRange = getS3Object(operationName, position, length); + objectRange = getS3ObjectAndValidateNotNull("readSingleRange", position, length); objectContent = objectRange.getObjectContent(); - if (objectContent == null) { - throw new PathIOException(uri, - "Null IO stream received during " + operationName); - } populateBuffer(length, buffer, objectContent); range.getData().complete(buffer); } catch (Exception ex) { @@ -1043,6 +1033,29 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, LOG.debug("Finished reading range {} from path {} ", range, pathStr); } + /** + * Get the s3 object for S3 server for a specified range. + * Also checks if the vectored io operation has been stopped before and after + * the http get request such that we don't waste time populating the buffers. + * @param operationName name of the operation for which get object on S3 is called. + * @param position position of the object to be read from S3. + * @param length length from position of the object to be read from S3. + * @return result s3 object. + * @throws IOException exception if any. + */ + private S3Object getS3ObjectAndValidateNotNull(final String operationName, + final long position, + final int length) throws IOException { + checkIfVectoredIOStopped(); + S3Object objectRange = getS3Object(operationName, position, length); + if (objectRange.getObjectContent() == null) { + throw new PathIOException(uri, + "Null IO stream received during " + operationName); + } + checkIfVectoredIOStopped(); + return objectRange; + } + /** * Populates the buffer with data from objectContent * till length. Handles both direct and heap byte buffers. --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org