[ https://issues.apache.org/jira/browse/HADOOP-19221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17868828#comment-17868828 ]
ASF GitHub Bot commented on HADOOP-19221: ----------------------------------------- mukund-thakur commented on code in PR #6938: URL: https://github.com/apache/hadoop/pull/6938#discussion_r1692281548 ########## hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java: ########## @@ -0,0 +1,396 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a.impl; + +import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.File; +import java.io.FileInputStream; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.nio.ByteBuffer; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.http.ContentStreamProvider; + +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.fs.store.ByteBufferInputStream; + +import static java.util.Objects.requireNonNull; +import static org.apache.hadoop.io.IOUtils.cleanupWithLogger; +import static org.apache.hadoop.util.Preconditions.checkArgument; +import static org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions; + +/** + * Implementations of {@code software.amazon.awssdk.http.ContentStreamProvider}. + * <p> + * These are required to ensure that retry of multipart uploads are reliable, + * while also avoiding memory copy/consumption overhead. + * <p> + * For these reasons the providers built in to the AWS SDK are not used. + * <p> + * See HADOOP-19221 for details. + */ +public final class UploadContentProviders { + + public static final Logger LOG = LoggerFactory.getLogger(UploadContentProviders.class); + + private UploadContentProviders() { + } + + /** + * Create a content provider from a file. + * @param file file to read. + * @param offset offset in file. + * @param size of data. + * @return the provider + * @throws IllegalArgumentException if the offset is negative. + */ + public static BaseContentProvider<BufferedInputStream> fileContentProvider( + File file, + long offset, + final int size) { + + return new FileWithOffsetContentProvider(file, offset, size); + } + + /** + * Create a content provider from a byte buffer. + * The buffer is not copied and MUST NOT be modified while + * the upload is taking place. + * @param byteBuffer buffer to read. + * @param size size of the data. + * @return the provider + * @throws IllegalArgumentException if the arguments are invalid. + * @throws NullPointerException if the buffer is null + */ + public static BaseContentProvider<ByteBufferInputStream> byteBufferContentProvider( + final ByteBuffer byteBuffer, + final int size) { + + return new ByteBufferContentProvider(byteBuffer, size); + } + + /** + * Create a content provider for all or part of a byte array. + * The buffer is not copied and MUST NOT be modified while + * the upload is taking place. + * @param bytes buffer to read. + * @param offset offset in buffer. + * @param size size of the data. + * @return the provider + * @throws IllegalArgumentException if the arguments are invalid. + * @throws NullPointerException if the buffer is null. + */ + public static BaseContentProvider<ByteArrayInputStream> byteArrayContentProvider( + final byte[] bytes, final int offset, final int size) { + return new ByteArrayContentProvider(bytes, offset, size); + } + + /** + * Create a content provider for all of a byte array. + * @param bytes buffer to read. + * @return the provider + * @throws IllegalArgumentException if the arguments are invalid. + * @throws NullPointerException if the buffer is null. + */ + public static BaseContentProvider<ByteArrayInputStream> byteArrayContentProvider( + final byte[] bytes) { + return byteArrayContentProvider(bytes, 0, bytes.length); + } + + /** + * Base class for content providers; tracks the number of times a stream + * has been opened. + * @param <T> type of stream created. + */ + @VisibleForTesting + public static abstract class BaseContentProvider<T extends InputStream> + implements ContentStreamProvider, Closeable { + + /** + * Size of the data. + */ + private final int size; + + /** + * How many times has a stream been created? + */ + private int streamCreationCount; + + /** + * Current stream. Null if not opened yet. + * When {@link #newStream()} is called, this is set to the new value, + * Note: when the input stream itself is closed, this reference is not updated. + * Therefore this field not being null does not imply that the stream is open. + */ + private T currentStream; + + /** + * Constructor. + * @param size size of the data. Must be non-negative. + */ + protected BaseContentProvider(final int size) { + checkArgument(size >= 0, "size is negative: %s", size); + this.size = size; + } + + /** + * Close the current stream. + */ + @Override + public void close() { + cleanupWithLogger(LOG, getCurrentStream()); + setCurrentStream(null); + } + + /** + * Note that a stream was created. + * <p> + * Logs if this is a subsequent event as it implies a failure of the first attempt. + * @return the new stream + */ + @Override + public final InputStream newStream() { + close(); + streamCreationCount++; + if (streamCreationCount > 1) { + LOG.info("Stream created more than once: {}", this); + } + return setCurrentStream(createNewStream()); + } + + /** + * Override point for subclasses to create their new streams. + * @return a stream + */ + protected abstract T createNewStream(); + + /** + * How many times has a stream been created? + * @return stream creation count + */ + public int getStreamCreationCount() { + return streamCreationCount; + } + + /** + * Size as set by constructor parameter. + * @return size of the data + */ + public int getSize() { + return size; + } + + /** + * Current stream. + * When {@link #newStream()} is called, this is set to the new value, + * after closing the previous one. + * <p> + * Why? The AWS SDK implementations do this, so there + * is an implication that it is needed to avoid keeping streams + * open on retries. + * @return the current stream, or null if none is open. + */ + protected T getCurrentStream() { + return currentStream; + } + + /** + * Set the current stream. + * @param stream the new stream + * @return the current stream. + */ + protected T setCurrentStream(T stream) { + this.currentStream = stream; + return stream; + } + + @Override + public String toString() { + return "BaseContentProvider{" + + "size=" + size + + ", streamCreationCount=" + streamCreationCount + + ", currentStream=" + currentStream + + '}'; + } + } + + /** + * Content provider for a file with an offset. + */ + private static final class FileWithOffsetContentProvider + extends BaseContentProvider<BufferedInputStream> { + + /** + * File to read. + */ + private final File file; + + /** + * Offset in file. + */ + private final long offset; + + /** + * Constructor. + * @param file file to read. + * @param offset offset in file. + * @param size of data. + * @throws IllegalArgumentException if the offset is negative. + */ + private FileWithOffsetContentProvider(final File file, + final long offset, + final int size) { + + super(size); + this.file = requireNonNull(file); + checkArgument(offset >= 0, "Offset is negative: %s", offset); + this.offset = offset; + } + + /** + * Create a new stream. + * @return a stream at the start of the offset in the file + * @throws UncheckedIOException on IO failure. + */ + @Override + protected BufferedInputStream createNewStream() throws UncheckedIOException { + // create the stream, seek to the offset. + final FileInputStream fis = uncheckIOExceptions(() -> { + final FileInputStream f = new FileInputStream(file); + f.getChannel().position(offset); + return f; + }); + return setCurrentStream(new BufferedInputStream(fis)); + } + + @Override + public String toString() { + return "FileWithOffsetContentProvider{" + + "file=" + file + + ", offset=" + offset + + "} " + super.toString(); + } + + } + + /** + * Create a content provider for a byte buffer. + * Uses {@link ByteBufferInputStream} to read the data. + */ + private static final class ByteBufferContentProvider + extends BaseContentProvider<ByteBufferInputStream> { + + /** + * The buffer which will be read; on or off heap. + */ + private final ByteBuffer blockBuffer; + + /** + * The position in the buffer at the time the provider was created. + */ + private final int initialPosition; + + /** + * Constructor. + * @param blockBuffer buffer to read. + * @param size size of the data. + * @throws IllegalArgumentException if the arguments are invalid. + * @throws NullPointerException if the buffer is null + */ + private ByteBufferContentProvider(final ByteBuffer blockBuffer, int size) { + super(size); + this.blockBuffer = blockBuffer; + this.initialPosition = blockBuffer.position(); + } + + @Override + protected ByteBufferInputStream createNewStream() { + // set the buffer up from reading from the beginning + blockBuffer.limit(initialPosition); Review Comment: Wondering why setting the limit is important? ########## hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md: ########## @@ -501,7 +543,43 @@ endpoint and region like the following: <value>${sts.region}</value> </property> ``` +## <a name="500_internal_error"></a> HTTP 500 status code "We encountered an internal error" + +``` +We encountered an internal error. Please try again. +(Service: S3, Status Code: 500, Request ID: <id>, Extended Request ID: <extended-id>) +``` + +The [status code 500](https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/500) indicates +the S3 store has reported an internal problem. +When raised by Amazon S3, we believe this is a rare sign of a problem within the S3 system +or another part of the cloud infrastructure on which it depends. +Retrying _should_ make it go away. + +The 500 error is considered retryable by the AWS SDK, which will have already +tried it `fs.s3a.attempts.maximum` times before reaching the S3A client -which Review Comment: nit: retried? > S3A: Unable to recover from failure of multipart block upload attempt "Status > Code: 400; Error Code: RequestTimeout" > -------------------------------------------------------------------------------------------------------------------- > > Key: HADOOP-19221 > URL: https://issues.apache.org/jira/browse/HADOOP-19221 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 3.4.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > Priority: Major > Labels: pull-request-available > > If a multipart PUT request fails for some reason (e.g. networrk error) then > all subsequent retry attempts fail with a 400 Response and ErrorCode > RequestTimeout . > {code} > Your socket connection to the server was not read from or written to within > the timeout period. Idle connections will be closed. (Service: Amazon S3; > Status Code: 400; Error Code: RequestTimeout; Request ID:; S3 Extended > Request ID: > {code} > The list of supporessed exceptions contains the root cause (the initial > failure was a 500); all retries failed to upload properly from the source > input stream {{RequestBody.fromInputStream(fileStream, size)}}. > Hypothesis: the mark/reset stuff doesn't work for input streams. On the v1 > sdk we would build a multipart block upload request passing in (file, offset, > length), the way we are now doing this doesn't recover. > probably fixable by providing our own {{ContentStreamProvider}} > implementations for > # file + offset + length > # bytebuffer > # byte array > The sdk does have explicit support for the memory ones, but they copy the > data blocks first. we don't want that as it would double the memory > requirements of active blocks. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org