wujinhu commented on code in PR #4912: URL: https://github.com/apache/hadoop/pull/4912#discussion_r1144328831
########## hadoop-tools/hadoop-aliyun/src/main/java/org/apache/hadoop/fs/aliyun/oss/AliyunOSSBlockOutputStream.java: ########## @@ -49,71 +48,103 @@ public class AliyunOSSBlockOutputStream extends OutputStream { LoggerFactory.getLogger(AliyunOSSBlockOutputStream.class); private AliyunOSSFileSystemStore store; private Configuration conf; - private boolean closed; + private final AtomicBoolean closed = new AtomicBoolean(false); private String key; - private File blockFile; - private Map<Integer, File> blockFiles = new HashMap<>(); - private long blockSize; + private int blockSize; private int blockId = 0; private long blockWritten = 0L; private String uploadId = null; private final List<ListenableFuture<PartETag>> partETagsFutures; + private final OSSDataBlocks.BlockFactory blockFactory; + private final BlockOutputStreamStatistics statistics; + private OSSDataBlocks.DataBlock activeBlock; private final ListeningExecutorService executorService; - private OutputStream blockStream; private final byte[] singleByte = new byte[1]; public AliyunOSSBlockOutputStream(Configuration conf, AliyunOSSFileSystemStore store, String key, - Long blockSize, + int blockSize, + OSSDataBlocks.BlockFactory blockFactory, + BlockOutputStreamStatistics statistics, ExecutorService executorService) throws IOException { this.store = store; this.conf = conf; this.key = key; this.blockSize = blockSize; - this.blockFile = newBlockFile(); - this.blockStream = - new BufferedOutputStream(new FileOutputStream(blockFile)); + this.blockFactory = blockFactory; + this.statistics = statistics; this.partETagsFutures = new ArrayList<>(2); this.executorService = MoreExecutors.listeningDecorator(executorService); } - private File newBlockFile() throws IOException { - return AliyunOSSUtils.createTmpFileForWrite( - String.format("oss-block-%04d-", blockId), blockSize, conf); + /** + * Demand create a destination block. + * @return the active block; null if there isn't one. + * @throws IOException on any failure to create + */ + private synchronized OSSDataBlocks.DataBlock createBlockIfNeeded() + throws IOException { + if (activeBlock == null) { + blockId++; + activeBlock = blockFactory.create(blockId, blockSize, statistics); + } + return activeBlock; } + /** + * Check for the filesystem being open. + * @throws IOException if the filesystem is closed. + */ + void checkOpen() throws IOException { + if (closed.get()) { + throw new IOException("Stream closed."); + } + } + + /** + * The flush operation does not trigger an upload; that awaits + * the next block being full. What it does do is call {@code flush() } + * on the current block, leaving it to choose how to react. + * @throws IOException Any IO problem. + */ @Override public synchronized void flush() throws IOException { - blockStream.flush(); + checkOpen(); + + OSSDataBlocks.DataBlock dataBlock = getActiveBlock(); + if (dataBlock != null) { + dataBlock.flush(); + } } @Override public synchronized void close() throws IOException { - if (closed) { + if (closed.getAndSet(true)) { Review Comment: `closed` is `AtomicBoolean` and`getAndSet` will set `closed` to `true`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: common-issues-unsubscr...@hadoop.apache.org For additional commands, e-mail: common-issues-h...@hadoop.apache.org