[ 
https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15550299#comment-15550299
 ] 

ASF GitHub Bot commented on HADOOP-13560:
-----------------------------------------

Github user cnauroth commented on a diff in the pull request:

    https://github.com/apache/hadoop/pull/130#discussion_r82091762
  
    --- Diff: 
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java
 ---
    @@ -0,0 +1,699 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.hadoop.fs.s3a;
    +
    +import java.io.IOException;
    +import java.io.InputStream;
    +import java.io.OutputStream;
    +import java.util.ArrayList;
    +import java.util.List;
    +import java.util.concurrent.Callable;
    +import java.util.concurrent.ExecutionException;
    +import java.util.concurrent.ExecutorService;
    +import java.util.concurrent.TimeUnit;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import com.amazonaws.AmazonClientException;
    +import com.amazonaws.event.ProgressEvent;
    +import com.amazonaws.event.ProgressEventType;
    +import com.amazonaws.event.ProgressListener;
    +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
    +import com.amazonaws.services.s3.model.PartETag;
    +import com.amazonaws.services.s3.model.PutObjectRequest;
    +import com.amazonaws.services.s3.model.PutObjectResult;
    +import com.amazonaws.services.s3.model.UploadPartRequest;
    +import com.google.common.base.Preconditions;
    +import com.google.common.util.concurrent.Futures;
    +import com.google.common.util.concurrent.ListenableFuture;
    +import com.google.common.util.concurrent.ListeningExecutorService;
    +import com.google.common.util.concurrent.MoreExecutors;
    +import org.slf4j.Logger;
    +import org.slf4j.LoggerFactory;
    +
    +import org.apache.hadoop.classification.InterfaceAudience;
    +import org.apache.hadoop.classification.InterfaceStability;
    +import org.apache.hadoop.io.IOUtils;
    +import org.apache.hadoop.io.retry.RetryPolicies;
    +import org.apache.hadoop.io.retry.RetryPolicy;
    +import org.apache.hadoop.util.Progressable;
    +
    +import static org.apache.hadoop.fs.s3a.S3AUtils.*;
    +import static org.apache.hadoop.fs.s3a.Statistic.*;
    +
    +/**
    + * Upload files/parts directly via different buffering mechanisms:
    + * including memory and disk.
    + *
    + * If the stream is closed and no update has started, then the upload
    + * is instead done as a single PUT operation.
    + *
    + * Unstable: statistics and error handling might evolve.
    + */
    +@InterfaceAudience.Private
    +@InterfaceStability.Unstable
    +class S3ABlockOutputStream extends OutputStream {
    +
    +  private static final Logger LOG =
    +      LoggerFactory.getLogger(S3ABlockOutputStream.class);
    +
    +  /** Owner FileSystem. */
    +  private final S3AFileSystem fs;
    +
    +  /** Object being uploaded. */
    +  private final String key;
    +
    +  /** Size of all blocks. */
    +  private final int blockSize;
    +
    +  /** Callback for progress. */
    +  private final ProgressListener progressListener;
    +  private final ListeningExecutorService executorService;
    +
    +  /**
    +   * Retry policy for multipart commits; not all AWS SDK versions retry 
that.
    +   */
    +  private final RetryPolicy retryPolicy =
    +      RetryPolicies.retryUpToMaximumCountWithProportionalSleep(
    +          5,
    +          2000,
    +          TimeUnit.MILLISECONDS);
    +  /**
    +   * Factory for blocks.
    +   */
    +  private final S3ADataBlocks.BlockFactory blockFactory;
    +
    +  /** Preallocated byte buffer for writing single characters. */
    +  private final byte[] singleCharWrite = new byte[1];
    +
    +  /** Multipart upload details; null means none started. */
    +  private MultiPartUpload multiPartUpload;
    +
    +  /** Closed flag. */
    +  private final AtomicBoolean closed = new AtomicBoolean(false);
    +
    +  /** Current data block. Null means none currently active */
    +  private S3ADataBlocks.DataBlock activeBlock;
    +
    +  /** Count of blocks uploaded. */
    +  private long blockCount = 0;
    +
    +  /** Statistics to build up. */
    +  private final S3AInstrumentation.OutputStreamStatistics statistics;
    +
    +  /**
    +   * Write operation helper; encapsulation of the filesystem operations.
    +   */
    +  private final S3AFileSystem.WriteOperationHelper writeOperationHelper;
    +
    +  /**
    +   * An S3A output stream which uploads partitions in a separate pool of
    +   * threads; different {@link S3ADataBlocks.BlockFactory}
    +   * instances can control where data is buffered.
    +   *
    +   * @param fs S3AFilesystem
    +   * @param key S3 object to work on.
    +   * @param executorService the executor service to use to schedule work
    +   * @param progress report progress in order to prevent timeouts. If
    +   * this class implements {@code ProgressListener} then it will be
    +   * directly wired up to the AWS client, so receive detailed progress
    +   * information.
    +   * @param blockSize size of a single block.
    +   * @param blockFactory factory for creating stream destinations
    +   * @param statistics stats for this stream
    +   * @param writeOperationHelper state of the write operation.
    +   * @throws IOException on any problem
    +   */
    +  S3ABlockOutputStream(S3AFileSystem fs,
    +      String key,
    +      ExecutorService executorService,
    +      Progressable progress,
    +      long blockSize,
    +      S3ADataBlocks.BlockFactory blockFactory,
    +      S3AInstrumentation.OutputStreamStatistics statistics,
    +      S3AFileSystem.WriteOperationHelper writeOperationHelper)
    +      throws IOException {
    +    this.fs = fs;
    +    this.key = key;
    +    this.blockFactory = blockFactory;
    +    this.blockSize = (int) blockSize;
    +    this.statistics = statistics;
    +    this.writeOperationHelper = writeOperationHelper;
    +    Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE,
    +        "Block size is too small: %d", blockSize);
    +    this.executorService = 
MoreExecutors.listeningDecorator(executorService);
    +    this.multiPartUpload = null;
    +    this.progressListener = (progress instanceof ProgressListener) ?
    +        (ProgressListener) progress
    +        : new ProgressableListener(progress);
    +    LOG.debug("Initialized S3ABlockOutputStream for {}" +
    --- End diff --
    
    I think `activeBlock` is always `null` when this log statement executes.


> S3ABlockOutputStream to support huge (many GB) file writes
> ----------------------------------------------------------
>
>                 Key: HADOOP-13560
>                 URL: https://issues.apache.org/jira/browse/HADOOP-13560
>             Project: Hadoop Common
>          Issue Type: Sub-task
>          Components: fs/s3
>    Affects Versions: 2.9.0
>            Reporter: Steve Loughran
>            Assignee: Steve Loughran
>            Priority: Minor
>         Attachments: HADOOP-13560-branch-2-001.patch, 
> HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch, 
> HADOOP-13560-branch-2-004.patch
>
>
> An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights 
> that metadata isn't copied on large copies.
> 1. Add a test to do that large copy/rname and verify that the copy really 
> works
> 2. Verify that metadata makes it over.
> Verifying large file rename is important on its own, as it is needed for very 
> large commit operations for committers using rename



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to