[ https://issues.apache.org/jira/browse/HADOOP-13560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15575312#comment-15575312 ]
ASF GitHub Bot commented on HADOOP-13560: ----------------------------------------- Github user thodemoor commented on a diff in the pull request: https://github.com/apache/hadoop/pull/130#discussion_r83419562 --- Diff: hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java --- @@ -0,0 +1,699 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.fs.s3a; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.event.ProgressEvent; +import com.amazonaws.event.ProgressEventType; +import com.amazonaws.event.ProgressListener; +import com.amazonaws.services.s3.model.CompleteMultipartUploadResult; +import com.amazonaws.services.s3.model.PartETag; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.PutObjectResult; +import com.amazonaws.services.s3.model.UploadPartRequest; +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.util.Progressable; + +import static org.apache.hadoop.fs.s3a.S3AUtils.*; +import static org.apache.hadoop.fs.s3a.Statistic.*; + +/** + * Upload files/parts directly via different buffering mechanisms: + * including memory and disk. + * + * If the stream is closed and no update has started, then the upload + * is instead done as a single PUT operation. + * + * Unstable: statistics and error handling might evolve. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +class S3ABlockOutputStream extends OutputStream { + + private static final Logger LOG = + LoggerFactory.getLogger(S3ABlockOutputStream.class); + + /** Owner FileSystem. */ + private final S3AFileSystem fs; + + /** Object being uploaded. */ + private final String key; + + /** Size of all blocks. */ + private final int blockSize; + + /** Callback for progress. */ + private final ProgressListener progressListener; + private final ListeningExecutorService executorService; + + /** + * Retry policy for multipart commits; not all AWS SDK versions retry that. + */ + private final RetryPolicy retryPolicy = + RetryPolicies.retryUpToMaximumCountWithProportionalSleep( + 5, + 2000, + TimeUnit.MILLISECONDS); + /** + * Factory for blocks. + */ + private final S3ADataBlocks.BlockFactory blockFactory; + + /** Preallocated byte buffer for writing single characters. */ + private final byte[] singleCharWrite = new byte[1]; + + /** Multipart upload details; null means none started. */ + private MultiPartUpload multiPartUpload; + + /** Closed flag. */ + private final AtomicBoolean closed = new AtomicBoolean(false); + + /** Current data block. Null means none currently active */ + private S3ADataBlocks.DataBlock activeBlock; + + /** Count of blocks uploaded. */ + private long blockCount = 0; + + /** Statistics to build up. */ + private final S3AInstrumentation.OutputStreamStatistics statistics; + + /** + * Write operation helper; encapsulation of the filesystem operations. + */ + private final S3AFileSystem.WriteOperationHelper writeOperationHelper; + + /** + * An S3A output stream which uploads partitions in a separate pool of + * threads; different {@link S3ADataBlocks.BlockFactory} + * instances can control where data is buffered. + * + * @param fs S3AFilesystem + * @param key S3 object to work on. + * @param executorService the executor service to use to schedule work + * @param progress report progress in order to prevent timeouts. If + * this object implements {@code ProgressListener} then it will be + * directly wired up to the AWS client, so receive detailed progress + * information. + * @param blockSize size of a single block. + * @param blockFactory factory for creating stream destinations + * @param statistics stats for this stream + * @param writeOperationHelper state of the write operation. + * @throws IOException on any problem + */ + S3ABlockOutputStream(S3AFileSystem fs, + String key, + ExecutorService executorService, + Progressable progress, + long blockSize, + S3ADataBlocks.BlockFactory blockFactory, + S3AInstrumentation.OutputStreamStatistics statistics, + S3AFileSystem.WriteOperationHelper writeOperationHelper) + throws IOException { + this.fs = fs; + this.key = key; + this.blockFactory = blockFactory; + this.blockSize = (int) blockSize; + this.statistics = statistics; + this.writeOperationHelper = writeOperationHelper; + Preconditions.checkArgument(blockSize >= Constants.MULTIPART_MIN_SIZE, + "Block size is too small: %d", blockSize); + this.executorService = MoreExecutors.listeningDecorator(executorService); + this.multiPartUpload = null; + this.progressListener = (progress instanceof ProgressListener) ? + (ProgressListener) progress + : new ProgressableListener(progress); + // create that first block. This guarantees that an open + close sequence + // writes a 0-byte entry. + createBlockIfNeeded(); + LOG.debug("Initialized S3ABlockOutputStream for {}" + + " output to {}", writeOperationHelper, activeBlock); + } + + /** + * Demand create a destination block. + * @return the active block; null if there isn't one. + * @throws IOException on any failure to create + */ + private synchronized S3ADataBlocks.DataBlock createBlockIfNeeded() + throws IOException { + if (activeBlock == null) { + blockCount++; + activeBlock = blockFactory.create(this.blockSize); + } + return activeBlock; + } + + /** + * Synchronized accessor to the active block. + * @return the active block; null if there isn't one. + */ + private synchronized S3ADataBlocks.DataBlock getActiveBlock() { + return activeBlock; + } + + /** + * Predicate to query whether or not there is an active block. + * @return true if there is an active block. + */ + private synchronized boolean hasActiveBlock() { + return activeBlock != null; + } + + /** + * Clear the active block. + */ + private void clearActiveBlock() { + LOG.debug("Clearing active block"); + synchronized (this) { + activeBlock = null; + } + } + + /** + * Check for the filesystem being open. + * @throws IOException if the filesystem is closed. + */ + void checkOpen() throws IOException { + if (closed.get()) { + throw new IOException("Filesystem " + writeOperationHelper + " closed"); + } + } + + /** + * The flush operation does not trigger an upload; that awaits + * the next block being full. What it does do is call {@code flush() } + * on the current block, leaving it to choose how to react. + * @throws IOException Any IO problem. + */ + @Override + public synchronized void flush() throws IOException { + checkOpen(); + S3ADataBlocks.DataBlock dataBlock = getActiveBlock(); + if (dataBlock != null) { + dataBlock.flush(); + } + } + + /** + * Writes a byte to the destination. If this causes the buffer to reach + * its limit, the actual upload is submitted to the threadpool. + * @param b the int of which the lowest byte is written + * @throws IOException on any problem + */ + @Override + public synchronized void write(int b) throws IOException { + singleCharWrite[0] = (byte)b; + write(singleCharWrite, 0, 1); + } + + /** + * Writes a range of bytes from to the memory buffer. If this causes the + * buffer to reach its limit, the actual upload is submitted to the + * threadpool and the remainder of the array is written to memory + * (recursively). + * @param source byte array containing + * @param offset offset in array where to start + * @param len number of bytes to be written + * @throws IOException on any problem + */ + @Override + public synchronized void write(byte[] source, int offset, int len) + throws IOException { + + S3ADataBlocks.validateWriteArgs(source, offset, len); + checkOpen(); + if (len == 0) { + return; + } + S3ADataBlocks.DataBlock block = createBlockIfNeeded(); + int written = block.write(source, offset, len); + int remainingCapacity = block.remainingCapacity(); + if (written < len) { + // not everything was written —the block has run out + // of capacity + // Trigger an upload then process the remainder. + LOG.debug("writing more data than block has capacity -triggering upload"); + uploadCurrentBlock(); + // tail recursion is mildly expensive, but given buffer sizes must be MB. --- End diff -- We can . With the min part size of 5MB you need a 50GB upload to test this. Will take a while vs. AWS. We can test this cheaply, but of course vs our S3-clone, but at least that will test the log @ error. @pieterreuse please add this to our testplan > S3ABlockOutputStream to support huge (many GB) file writes > ---------------------------------------------------------- > > Key: HADOOP-13560 > URL: https://issues.apache.org/jira/browse/HADOOP-13560 > Project: Hadoop Common > Issue Type: Sub-task > Components: fs/s3 > Affects Versions: 2.9.0 > Reporter: Steve Loughran > Assignee: Steve Loughran > Attachments: HADOOP-13560-branch-2-001.patch, > HADOOP-13560-branch-2-002.patch, HADOOP-13560-branch-2-003.patch, > HADOOP-13560-branch-2-004.patch > > > An AWS SDK [issue|https://github.com/aws/aws-sdk-java/issues/367] highlights > that metadata isn't copied on large copies. > 1. Add a test to do that large copy/rname and verify that the copy really > works > 2. Verify that metadata makes it over. > Verifying large file rename is important on its own, as it is needed for very > large commit operations for committers using rename -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org