HADOOP-15607. AliyunOSS: fix duplicated partNumber issue in AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0857f116 Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0857f116 Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0857f116 Branch: refs/heads/YARN-7402 Commit: 0857f116b754d83d3c540cd6f989087af24fef27 Parents: 007e6f5 Author: Sammi Chen <sammi.c...@intel.com> Authored: Mon Jul 30 10:53:44 2018 +0800 Committer: Sammi Chen <sammi.c...@intel.com> Committed: Mon Jul 30 10:53:44 2018 +0800 ---------------------------------------------------------------------- .../aliyun/oss/AliyunOSSBlockOutputStream.java | 59 ++++++++++++-------- .../fs/aliyun/oss/AliyunOSSFileSystemStore.java | 2 + .../oss/TestAliyunOSSBlockOutputStream.java | 12 +++- 3 files changed, 49 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hadoop/blob/0857f116/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java index 12d551b..0a833b2 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java @@ -33,7 +33,9 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStream; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; @@ -50,7 +52,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream { private boolean closed; private String key; private File blockFile; - private List<File> blockFiles = new ArrayList<>(); + private Map<Integer, File> blockFiles = new HashMap<>(); private long blockSize; private int blockId = 0; private long blockWritten = 0L; @@ -94,8 +96,9 @@ public class AliyunOSSBlockOutputStream extends OutputStream { blockStream.flush(); blockStream.close(); - if (!blockFiles.contains(blockFile)) { - blockFiles.add(blockFile); + if (!blockFiles.values().contains(blockFile)) { + blockId++; + blockFiles.put(blockId, blockFile); } try { @@ -107,7 +110,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream { ListenableFuture<PartETag> partETagFuture = executorService.submit(() -> { PartETag partETag = store.uploadPart(blockFile, key, uploadId, - blockId + 1); + blockId); return partETag; }); partETagsFutures.add(partETagFuture); @@ -120,11 +123,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream { store.completeMultipartUpload(key, uploadId, partETags); } } finally { - for (File tFile: blockFiles) { - if (tFile.exists() && !tFile.delete()) { - LOG.warn("Failed to delete temporary file {}", tFile); - } - } + removePartFiles(); closed = true; } } @@ -141,38 +140,52 @@ public class AliyunOSSBlockOutputStream extends OutputStream { if (closed) { throw new IOException("Stream closed."); } - try { - blockStream.write(b, off, len); - blockWritten += len; - if (blockWritten >= blockSize) { - uploadCurrentPart(); - blockWritten = 0L; + blockStream.write(b, off, len); + blockWritten += len; + if (blockWritten >= blockSize) { + uploadCurrentPart(); + blockWritten = 0L; + } + } + + private void removePartFiles() throws IOException { + for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) { + if (!partETagFuture.isDone()) { + continue; } - } finally { - for (File tFile: blockFiles) { - if (tFile.exists() && !tFile.delete()) { - LOG.warn("Failed to delete temporary file {}", tFile); + + try { + File blockFile = blockFiles.get(partETagFuture.get().getPartNumber()); + if (blockFile != null && blockFile.exists() && !blockFile.delete()) { + LOG.warn("Failed to delete temporary file {}", blockFile); } + } catch (InterruptedException | ExecutionException e) { + throw new IOException(e); } } } private void uploadCurrentPart() throws IOException { - blockFiles.add(blockFile); blockStream.flush(); blockStream.close(); if (blockId == 0) { uploadId = store.getUploadId(key); } + + blockId++; + blockFiles.put(blockId, blockFile); + + File currentFile = blockFile; + int currentBlockId = blockId; ListenableFuture<PartETag> partETagFuture = executorService.submit(() -> { - PartETag partETag = store.uploadPart(blockFile, key, uploadId, - blockId + 1); + PartETag partETag = store.uploadPart(currentFile, key, uploadId, + currentBlockId); return partETag; }); partETagsFutures.add(partETagFuture); + removePartFiles(); blockFile = newBlockFile(); - blockId++; blockStream = new BufferedOutputStream(new FileOutputStream(blockFile)); } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0857f116/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java index 5e21759..dc5f99ee 100644 --- a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java +++ b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java @@ -450,6 +450,8 @@ public class AliyunOSSFileSystemStore { request.setRange(byteStart, byteEnd); return ossClient.getObject(request).getObjectContent(); } catch (OSSException | ClientException e) { + LOG.error("Exception thrown when store retrieves key: " + + key + ", exception: " + e); return null; } } http://git-wip-us.apache.org/repos/asf/hadoop/blob/0857f116/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java ---------------------------------------------------------------------- diff --git a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java index 365d931..6fe6f03 100644 --- a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java +++ b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java @@ -31,6 +31,7 @@ import org.junit.rules.Timeout; import java.io.IOException; import static org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT; +import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE; /** * Tests regular and multi-part upload functionality for @@ -48,7 +49,10 @@ public class TestAliyunOSSBlockOutputStream { public void setUp() throws Exception { Configuration conf = new Configuration(); conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 1024); - conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024); + conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024); + conf.setInt(IO_CHUNK_BUFFER_SIZE, + conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0)); + conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20); fs = AliyunOSSTestUtils.createTestFileSystem(conf); } @@ -85,6 +89,12 @@ public class TestAliyunOSSBlockOutputStream { } @Test + public void testMultiPartUploadConcurrent() throws IOException { + ContractTestUtils.createAndVerifyFile(fs, getTestPath(), + 50 * 1024 * 1024 - 1); + } + + @Test public void testHugeUpload() throws IOException { ContractTestUtils.createAndVerifyFile(fs, getTestPath(), MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1); --------------------------------------------------------------------- To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-commits-h...@hadoop.apache.org