HADOOP-15607. AliyunOSS: fix duplicated partNumber issue in 
AliyunOSSBlockOutputStream. Contributed by Jinhu Wu.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0857f116
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0857f116
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0857f116

Branch: refs/heads/YARN-7402
Commit: 0857f116b754d83d3c540cd6f989087af24fef27
Parents: 007e6f5
Author: Sammi Chen <sammi.c...@intel.com>
Authored: Mon Jul 30 10:53:44 2018 +0800
Committer: Sammi Chen <sammi.c...@intel.com>
Committed: Mon Jul 30 10:53:44 2018 +0800

----------------------------------------------------------------------
 .../aliyun/oss/AliyunOSSBlockOutputStream.java  | 59 ++++++++++++--------
 .../fs/aliyun/oss/AliyunOSSFileSystemStore.java |  2 +
 .../oss/TestAliyunOSSBlockOutputStream.java     | 12 +++-
 3 files changed, 49 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0857f116/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
index 12d551b..0a833b2 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java
@@ -33,7 +33,9 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 
@@ -50,7 +52,7 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
   private boolean closed;
   private String key;
   private File blockFile;
-  private List<File> blockFiles = new ArrayList<>();
+  private Map<Integer, File> blockFiles = new HashMap<>();
   private long blockSize;
   private int blockId = 0;
   private long blockWritten = 0L;
@@ -94,8 +96,9 @@ public class AliyunOSSBlockOutputStream extends OutputStream {
 
     blockStream.flush();
     blockStream.close();
-    if (!blockFiles.contains(blockFile)) {
-      blockFiles.add(blockFile);
+    if (!blockFiles.values().contains(blockFile)) {
+      blockId++;
+      blockFiles.put(blockId, blockFile);
     }
 
     try {
@@ -107,7 +110,7 @@ public class AliyunOSSBlockOutputStream extends 
OutputStream {
           ListenableFuture<PartETag> partETagFuture =
               executorService.submit(() -> {
                 PartETag partETag = store.uploadPart(blockFile, key, uploadId,
-                    blockId + 1);
+                    blockId);
                 return partETag;
               });
           partETagsFutures.add(partETagFuture);
@@ -120,11 +123,7 @@ public class AliyunOSSBlockOutputStream extends 
OutputStream {
         store.completeMultipartUpload(key, uploadId, partETags);
       }
     } finally {
-      for (File tFile: blockFiles) {
-        if (tFile.exists() && !tFile.delete()) {
-          LOG.warn("Failed to delete temporary file {}", tFile);
-        }
-      }
+      removePartFiles();
       closed = true;
     }
   }
@@ -141,38 +140,52 @@ public class AliyunOSSBlockOutputStream extends 
OutputStream {
     if (closed) {
       throw new IOException("Stream closed.");
     }
-    try {
-      blockStream.write(b, off, len);
-      blockWritten += len;
-      if (blockWritten >= blockSize) {
-        uploadCurrentPart();
-        blockWritten = 0L;
+    blockStream.write(b, off, len);
+    blockWritten += len;
+    if (blockWritten >= blockSize) {
+      uploadCurrentPart();
+      blockWritten = 0L;
+    }
+  }
+
+  private void removePartFiles() throws IOException {
+    for (ListenableFuture<PartETag> partETagFuture : partETagsFutures) {
+      if (!partETagFuture.isDone()) {
+        continue;
       }
-    } finally {
-      for (File tFile: blockFiles) {
-        if (tFile.exists() && !tFile.delete()) {
-          LOG.warn("Failed to delete temporary file {}", tFile);
+
+      try {
+        File blockFile = blockFiles.get(partETagFuture.get().getPartNumber());
+        if (blockFile != null && blockFile.exists() && !blockFile.delete()) {
+          LOG.warn("Failed to delete temporary file {}", blockFile);
         }
+      } catch (InterruptedException | ExecutionException e) {
+        throw new IOException(e);
       }
     }
   }
 
   private void uploadCurrentPart() throws IOException {
-    blockFiles.add(blockFile);
     blockStream.flush();
     blockStream.close();
     if (blockId == 0) {
       uploadId = store.getUploadId(key);
     }
+
+    blockId++;
+    blockFiles.put(blockId, blockFile);
+
+    File currentFile = blockFile;
+    int currentBlockId = blockId;
     ListenableFuture<PartETag> partETagFuture =
         executorService.submit(() -> {
-          PartETag partETag = store.uploadPart(blockFile, key, uploadId,
-              blockId + 1);
+          PartETag partETag = store.uploadPart(currentFile, key, uploadId,
+              currentBlockId);
           return partETag;
         });
     partETagsFutures.add(partETagFuture);
+    removePartFiles();
     blockFile = newBlockFile();
-    blockId++;
     blockStream = new BufferedOutputStream(new FileOutputStream(blockFile));
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0857f116/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
index 5e21759..dc5f99ee 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSFileSystemStore.java
@@ -450,6 +450,8 @@ public class AliyunOSSFileSystemStore {
       request.setRange(byteStart, byteEnd);
       return ossClient.getObject(request).getObjectContent();
     } catch (OSSException | ClientException e) {
+      LOG.error("Exception thrown when store retrieves key: "
+              + key + ", exception: " + e);
       return null;
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0857f116/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
----------------------------------------------------------------------
diff --git 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
index 365d931..6fe6f03 100644
--- 
a/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
+++ 
b/hadoop-tools/hadoop-aliyun/src/test/java/org/apache/hadoop/fs/aliyun/oss/TestAliyunOSSBlockOutputStream.java
@@ -31,6 +31,7 @@ import org.junit.rules.Timeout;
 import java.io.IOException;
 
 import static 
org.apache.hadoop.fs.aliyun.oss.Constants.MULTIPART_UPLOAD_PART_SIZE_DEFAULT;
+import static 
org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
 
 /**
  * Tests regular and multi-part upload functionality for
@@ -48,7 +49,10 @@ public class TestAliyunOSSBlockOutputStream {
   public void setUp() throws Exception {
     Configuration conf = new Configuration();
     conf.setLong(Constants.MIN_MULTIPART_UPLOAD_THRESHOLD_KEY, 5 * 1024 * 
1024);
-    conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 5 * 1024 * 1024);
+    conf.setInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 1024 * 1024);
+    conf.setInt(IO_CHUNK_BUFFER_SIZE,
+        conf.getInt(Constants.MULTIPART_UPLOAD_PART_SIZE_KEY, 0));
+    conf.setInt(Constants.UPLOAD_ACTIVE_BLOCKS_KEY, 20);
     fs = AliyunOSSTestUtils.createTestFileSystem(conf);
   }
 
@@ -85,6 +89,12 @@ public class TestAliyunOSSBlockOutputStream {
   }
 
   @Test
+  public void testMultiPartUploadConcurrent() throws IOException {
+    ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
+        50 * 1024 * 1024 - 1);
+  }
+
+  @Test
   public void testHugeUpload() throws IOException {
     ContractTestUtils.createAndVerifyFile(fs, getTestPath(),
         MULTIPART_UPLOAD_PART_SIZE_DEFAULT - 1);


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to