[ 
https://issues.apache.org/jira/browse/HADOOP-19221?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17869344#comment-17869344
 ] 

ASF GitHub Bot commented on HADOOP-19221:
-----------------------------------------

steveloughran commented on code in PR #6938:
URL: https://github.com/apache/hadoop/pull/6938#discussion_r1695148971


##########
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/UploadContentProviders.java:
##########
@@ -0,0 +1,396 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.fs.s3a.impl;
+
+import java.io.BufferedInputStream;
+import java.io.ByteArrayInputStream;
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.nio.ByteBuffer;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.http.ContentStreamProvider;
+
+import org.apache.hadoop.classification.VisibleForTesting;
+import org.apache.hadoop.fs.store.ByteBufferInputStream;
+
+import static java.util.Objects.requireNonNull;
+import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
+import static org.apache.hadoop.util.Preconditions.checkArgument;
+import static 
org.apache.hadoop.util.functional.FunctionalIO.uncheckIOExceptions;
+
+/**
+ * Implementations of {@code 
software.amazon.awssdk.http.ContentStreamProvider}.
+ * <p>
+ * These are required to ensure that retry of multipart uploads are reliable,
+ * while also avoiding memory copy/consumption overhead.
+ * <p>
+ * For these reasons the providers built in to the AWS SDK are not used.
+ * <p>
+ * See HADOOP-19221 for details.
+ */
+public final class UploadContentProviders {
+
+  public static final Logger LOG = 
LoggerFactory.getLogger(UploadContentProviders.class);
+
+  private UploadContentProviders() {
+  }
+
+  /**
+   * Create a content provider from a file.
+   * @param file file to read.
+   * @param offset offset in file.
+   * @param size of data.
+   * @return the provider
+   * @throws IllegalArgumentException if the offset is negative.
+   */
+  public static BaseContentProvider<BufferedInputStream> fileContentProvider(
+      File file,
+      long offset,
+      final int size) {
+
+    return new FileWithOffsetContentProvider(file, offset, size);
+  }
+
+  /**
+   * Create a content provider from a byte buffer.
+   * The buffer is not copied and MUST NOT be modified while
+   * the upload is taking place.
+   * @param byteBuffer buffer to read.
+   * @param size size of the data.
+   * @return the provider
+   * @throws IllegalArgumentException if the arguments are invalid.
+   * @throws NullPointerException if the buffer is null
+   */
+  public static BaseContentProvider<ByteBufferInputStream> 
byteBufferContentProvider(
+      final ByteBuffer byteBuffer,
+      final int size) {
+
+    return new ByteBufferContentProvider(byteBuffer, size);
+  }
+
+  /**
+   * Create a content provider for all or part of a byte array.
+   * The buffer is not copied and MUST NOT be modified while
+   * the upload is taking place.
+   * @param bytes buffer to read.
+   * @param offset offset in buffer.
+   * @param size size of the data.
+   * @return the provider
+   * @throws IllegalArgumentException if the arguments are invalid.
+   * @throws NullPointerException if the buffer is null.
+   */
+  public static BaseContentProvider<ByteArrayInputStream> 
byteArrayContentProvider(
+      final byte[] bytes, final int offset, final int size) {
+    return new ByteArrayContentProvider(bytes, offset, size);
+  }
+
+  /**
+   * Create a content provider for all of a byte array.
+   * @param bytes buffer to read.
+   * @return the provider
+   * @throws IllegalArgumentException if the arguments are invalid.
+   * @throws NullPointerException if the buffer is null.
+   */
+  public static BaseContentProvider<ByteArrayInputStream> 
byteArrayContentProvider(
+      final byte[] bytes) {
+    return byteArrayContentProvider(bytes, 0, bytes.length);
+  }
+
+  /**
+   * Base class for content providers; tracks the number of times a stream
+   * has been opened.
+   * @param <T> type of stream created.
+   */
+  @VisibleForTesting
+  public static abstract class BaseContentProvider<T extends InputStream>
+      implements ContentStreamProvider, Closeable {
+
+    /**
+     * Size of the data.
+     */
+    private final int size;
+
+    /**
+     * How many times has a stream been created?
+     */
+    private int streamCreationCount;
+
+    /**
+     * Current stream. Null if not opened yet.
+     * When {@link #newStream()} is called, this is set to the new value,
+     * Note: when the input stream itself is closed, this reference is not 
updated.
+     * Therefore this field not being null does not imply that the stream is 
open.
+     */
+    private T currentStream;
+
+    /**
+     * Constructor.
+     * @param size size of the data. Must be non-negative.
+     */
+    protected BaseContentProvider(final int size) {
+      checkArgument(size >= 0, "size is negative: %s", size);
+      this.size = size;
+    }
+
+    /**
+     * Close the current stream.
+     */
+    @Override
+    public void close() {
+      cleanupWithLogger(LOG, getCurrentStream());
+      setCurrentStream(null);
+    }
+
+    /**
+     * Note that a stream was created.
+     * <p>
+     * Logs if this is a subsequent event as it implies a failure of the first 
attempt.
+     * @return the new stream
+     */
+    @Override
+    public final InputStream newStream() {
+      close();
+      streamCreationCount++;
+      if (streamCreationCount > 1) {
+        LOG.info("Stream created more than once: {}", this);
+      }
+      return setCurrentStream(createNewStream());
+    }
+
+    /**
+     * Override point for subclasses to create their new streams.
+     * @return a stream
+     */
+    protected abstract T createNewStream();
+
+    /**
+     * How many times has a stream been created?
+     * @return stream creation count
+     */
+    public int getStreamCreationCount() {
+      return streamCreationCount;
+    }
+
+    /**
+     * Size as set by constructor parameter.
+     * @return size of the data
+     */
+    public int getSize() {
+      return size;
+    }
+
+    /**
+     * Current stream.
+     * When {@link #newStream()} is called, this is set to the new value,
+     * after closing the previous one.
+     * <p>
+     * Why? The AWS SDK implementations do this, so there
+     * is an implication that it is needed to avoid keeping streams
+     * open on retries.
+     * @return the current stream, or null if none is open.
+     */
+    protected T getCurrentStream() {
+      return currentStream;
+    }
+
+    /**
+     * Set the current stream.
+     * @param stream the new stream
+     * @return the current stream.
+     */
+    protected T setCurrentStream(T stream) {
+      this.currentStream = stream;
+      return stream;
+    }
+
+    @Override
+    public String toString() {
+      return "BaseContentProvider{" +
+          "size=" + size +
+          ", streamCreationCount=" + streamCreationCount +
+          ", currentStream=" + currentStream +
+          '}';
+    }
+  }
+
+  /**
+   * Content provider for a file with an offset.
+   */
+  private static final class FileWithOffsetContentProvider
+      extends BaseContentProvider<BufferedInputStream> {
+
+    /**
+     * File to read.
+     */
+    private final File file;
+
+    /**
+     * Offset in file.
+     */
+    private final long offset;
+
+    /**
+     * Constructor.
+     * @param file file to read.
+     * @param offset offset in file.
+     * @param size of data.
+     * @throws IllegalArgumentException if the offset is negative.
+     */
+    private FileWithOffsetContentProvider(final File file,
+        final long offset,
+        final int size) {
+
+      super(size);
+      this.file = requireNonNull(file);
+      checkArgument(offset >= 0, "Offset is negative: %s", offset);
+      this.offset = offset;
+    }
+
+    /**
+     * Create a new stream.
+     * @return a stream at the start of the offset in the file
+     * @throws UncheckedIOException on IO failure.
+     */
+    @Override
+    protected BufferedInputStream createNewStream() throws 
UncheckedIOException {
+      // create the stream, seek to the offset.
+      final FileInputStream fis = uncheckIOExceptions(() -> {
+        final FileInputStream f = new FileInputStream(file);
+        f.getChannel().position(offset);
+        return f;
+      });
+      return setCurrentStream(new BufferedInputStream(fis));
+    }
+
+    @Override
+    public String toString() {
+      return "FileWithOffsetContentProvider{" +
+          "file=" + file +
+          ", offset=" + offset +
+          "} " + super.toString();
+    }
+
+  }
+
+  /**
+   * Create a content provider for a byte buffer.
+   * Uses {@link ByteBufferInputStream} to read the data.
+   */
+  private static final class ByteBufferContentProvider
+      extends BaseContentProvider<ByteBufferInputStream> {
+
+    /**
+     * The buffer which will be read; on or off heap.
+     */
+    private final ByteBuffer blockBuffer;
+
+    /**
+     * The position in the buffer at the time the provider was created.
+     */
+    private final int initialPosition;
+
+    /**
+     * Constructor.
+     * @param blockBuffer buffer to read.
+     * @param size size of the data.
+     * @throws IllegalArgumentException if the arguments are invalid.
+     * @throws NullPointerException if the buffer is null
+     */
+    private ByteBufferContentProvider(final ByteBuffer blockBuffer, int size) {
+      super(size);
+      this.blockBuffer = blockBuffer;
+      this.initialPosition = blockBuffer.position();
+    }
+
+    @Override
+    protected ByteBufferInputStream createNewStream() {
+      // set the buffer up from reading from the beginning
+      blockBuffer.limit(initialPosition);

Review Comment:
   we want to start reading from the initial position every time the stream is 
opened.





> S3A: Unable to recover from failure of multipart block upload attempt "Status 
> Code: 400; Error Code: RequestTimeout"
> --------------------------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-19221
>                 URL: https://issues.apache.org/jira/browse/HADOOP-19221
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 3.4.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Major
>              Labels: pull-request-available
>
> If a multipart PUT request fails for some reason (e.g. networrk error) then 
> all subsequent retry attempts fail with a 400 Response and ErrorCode 
> RequestTimeout .
> {code}
> Your socket connection to the server was not read from or written to within 
> the timeout period. Idle connections will be closed. (Service: Amazon S3; 
> Status Code: 400; Error Code: RequestTimeout; Request ID:; S3 Extended 
> Request ID:
> {code}
> The list of supporessed exceptions contains the root cause (the initial 
> failure was a 500); all retries failed to upload properly from the source 
> input stream {{RequestBody.fromInputStream(fileStream, size)}}.
> Hypothesis: the mark/reset stuff doesn't work for input streams. On the v1 
> sdk we would build a multipart block upload request passing in (file, offset, 
> length), the way we are now doing this doesn't recover.
> probably fixable by providing our own {{ContentStreamProvider}} 
> implementations for
> # file + offset + length
> # bytebuffer
> # byte array
> The sdk does have explicit support for the memory ones, but they copy the 
> data blocks first. we don't want that as it would double the memory 
> requirements of active blocks.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to