[ 
https://issues.apache.org/jira/browse/HADOOP-18458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703869#comment-17703869
 ] 

ASF GitHub Bot commented on HADOOP-18458:
-----------------------------------------

ChenSammi commented on code in PR #4912:
URL: https://github.com/apache/hadoop/pull/4912#discussion_r1145662429


##########
hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java:
##########
@@ -49,71 +48,103 @@ public class AliyunOSSBlockOutputStream extends 
OutputStream {
       LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class);
   private AliyunOSSFileSystemStore store;
   private Configuration conf;
-  private boolean closed;
+  private final AtomicBoolean closed = new AtomicBoolean(false);
   private String key;
-  private File blockFile;
-  private Map<Integer, File> blockFiles = new HashMap<>();
-  private long blockSize;
+  private int blockSize;
   private int blockId = 0;
   private long blockWritten = 0L;
   private String uploadId = null;
   private final List<ListenableFuture<PartETag>> partETagsFutures;
+  private final OSSDataBlocks.BlockFactory blockFactory;
+  private final BlockOutputStreamStatistics statistics;
+  private OSSDataBlocks.DataBlock activeBlock;
   private final ListeningExecutorService executorService;
-  private OutputStream blockStream;
   private final byte[] singleByte = new byte[1];
 
   public AliyunOSSBlockOutputStream(Configuration conf,
       AliyunOSSFileSystemStore store,
       String key,
-      Long blockSize,
+      int blockSize,
+      OSSDataBlocks.BlockFactory blockFactory,
+      BlockOutputStreamStatistics statistics,
       ExecutorService executorService) throws IOException {
     this.store = store;
     this.conf = conf;
     this.key = key;
     this.blockSize = blockSize;
-    this.blockFile = newBlockFile();
-    this.blockStream =
-        new BufferedOutputStream(new FileOutputStream(blockFile));
+    this.blockFactory = blockFactory;
+    this.statistics = statistics;
     this.partETagsFutures = new ArrayList<>(2);
     this.executorService = MoreExecutors.listeningDecorator(executorService);
   }
 
-  private File newBlockFile() throws IOException {
-    return AliyunOSSUtils.createTmpFileForWrite(
-        String.format("oss-block-%04d-", blockId), blockSize, conf);
+  /**
+   * Demand create a destination block.
+   * @return the active block; null if there isn't one.
+   * @throws IOException on any failure to create
+   */
+  private synchronized OSSDataBlocks.DataBlock createBlockIfNeeded()
+      throws IOException {
+    if (activeBlock == null) {
+      blockId++;
+      activeBlock = blockFactory.create(blockId, blockSize, statistics);
+    }
+    return activeBlock;
   }
 
+  /**
+   * Check for the filesystem being open.
+   * @throws IOException if the filesystem is closed.
+   */
+  void checkOpen() throws IOException {
+    if (closed.get()) {
+      throw new IOException("Stream closed.");
+    }
+  }
+
+  /**
+   * The flush operation does not trigger an upload; that awaits
+   * the next block being full. What it does do is call {@code flush() }
+   * on the current block, leaving it to choose how to react.
+   * @throws IOException Any IO problem.
+   */
   @Override
   public synchronized void flush() throws IOException {
-    blockStream.flush();
+    checkOpen();
+
+    OSSDataBlocks.DataBlock dataBlock = getActiveBlock();
+    if (dataBlock != null) {
+      dataBlock.flush();
+    }
   }
 
   @Override
   public synchronized void close() throws IOException {
-    if (closed) {
+    if (closed.getAndSet(true)) {

Review Comment:
   The point is "closed" should be set to true in the "finally" statement after 
all resources are closed.  It should not set to true at the beginning. 





> AliyunOSS: AliyunOSSBlockOutputStream to support heap/off-heap buffer before 
> uploading data to OSS
> --------------------------------------------------------------------------------------------------
>
>                 Key: HADOOP-18458
>                 URL: https://issues.apache.org/jira/browse/HADOOP-18458
>             Project: Hadoop Common
>          Issue Type: Improvement
>          Components: fs/oss
>    Affects Versions: 3.0.3, 3.1.4, 2.10.2, 3.2.4, 3.3.4
>            Reporter: wujinhu
>            Assignee: wujinhu
>            Priority: Major
>              Labels: pull-request-available
>
> Recently, our customers raise a requirement: AliyunOSSBlockOutputStream 
> should support heap/off-heap buffer before uploading data to OSS.
> Currently, AliyunOSSBlockOutputStream buffers data in local directory before 
> uploading to OSS, it is not efficient compared to memory.
> Changes:
>  # Adds heap/off-heap buffers
>  # Adds limitation of memory used, and fallback to disk



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-issues-h...@hadoop.apache.org

Reply via email to