[ https://issues.apache.org/jira/browse/HADOOP-18458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17703869#comment-17703869 ]
ASF GitHub Bot commented on HADOOP-18458: ----------------------------------------- ChenSammi commented on code in PR #4912: URL: https://github.com/apache/hadoop/pull/4912#discussion_r1145662429 ########## hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java: ########## @@ -49,71 +48,103 @@ public class AliyunOSSBlockOutputStream extends OutputStream { LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class); private AliyunOSSFileSystemStore store; private Configuration conf; - private boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(false); private String key; - private File blockFile; - private Map<Integer, File> blockFiles = new HashMap<>(); - private long blockSize; + private int blockSize; private int blockId = 0; private long blockWritten = 0L; private String uploadId = null; private final List<ListenableFuture<PartETag>> partETagsFutures; + private final OSSDataBlocks.BlockFactory blockFactory; + private final BlockOutputStreamStatistics statistics; + private OSSDataBlocks.DataBlock activeBlock; private final ListeningExecutorService executorService; - private OutputStream blockStream; private final byte[] singleByte = new byte[1]; public AliyunOSSBlockOutputStream(Configuration conf, AliyunOSSFileSystemStore store, String key, - Long blockSize, + int blockSize, + OSSDataBlocks.BlockFactory blockFactory, + BlockOutputStreamStatistics statistics, ExecutorService executorService) throws IOException { this.store = store; this.conf = conf; this.key = key; this.blockSize = blockSize; - this.blockFile = newBlockFile(); - this.blockStream = - new BufferedOutputStream(new FileOutputStream(blockFile)); + this.blockFactory = blockFactory; + this.statistics = statistics; this.partETagsFutures = new ArrayList<>(2); this.executorService = MoreExecutors.listeningDecorator(executorService); } - private File newBlockFile() throws IOException { - return AliyunOSSUtils.createTmpFileForWrite( - String.format("oss-block-%04d-", blockId), blockSize, conf); + /** + * Demand create a destination block. + * @return the active block; null if there isn't one. + * @throws IOException on any failure to create + */ + private synchronized OSSDataBlocks.DataBlock createBlockIfNeeded() + throws IOException { + if (activeBlock == null) { + blockId++; + activeBlock = blockFactory.create(blockId, blockSize, statistics); + } + return activeBlock; } + /** + * Check for the filesystem being open. + * @throws IOException if the filesystem is closed. + */ + void checkOpen() throws IOException { + if (closed.get()) { + throw new IOException("Stream closed."); + } + } + + /** + * The flush operation does not trigger an upload; that awaits + * the next block being full. What it does do is call {@code flush() } + * on the current block, leaving it to choose how to react. + * @throws IOException Any IO problem. + */ @Override public synchronized void flush() throws IOException { - blockStream.flush(); + checkOpen(); + + OSSDataBlocks.DataBlock dataBlock = getActiveBlock(); + if (dataBlock != null) { + dataBlock.flush(); + } } @Override public synchronized void close() throws IOException { - if (closed) { + if (closed.getAndSet(true)) { Review Comment: The point is "closed" should be set to true in the "finally" statement after all resources are closed. It should not set to true at the beginning. > AliyunOSS: AliyunOSSBlockOutputStream to support heap/off-heap buffer before > uploading data to OSS > -------------------------------------------------------------------------------------------------- > > Key: HADOOP-18458 > URL: https://issues.apache.org/jira/browse/HADOOP-18458 > Project: Hadoop Common > Issue Type: Improvement > Components: fs/oss > Affects Versions: 3.0.3, 3.1.4, 2.10.2, 3.2.4, 3.3.4 > Reporter: wujinhu > Assignee: wujinhu > Priority: Major > Labels: pull-request-available > > Recently, our customers raise a requirement: AliyunOSSBlockOutputStream > should support heap/off-heap buffer before uploading data to OSS. > Currently, AliyunOSSBlockOutputStream buffers data in local directory before > uploading to OSS, it is not efficient compared to memory. > Changes: > # Adds heap/off-heap buffers > # Adds limitation of memory used, and fallback to disk -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org