prclin commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2194482125


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -47,41 +69,153 @@ public String identifier() {
     @Override
     public void initialize(FileSystem fs, Path parentPath, Serializer 
serializer)
             throws IOException {
-        Path path = new Path(parentPath, FILE_NAME);
-        this.out = fs.create(path);
+        this.fs = fs;
         this.serializer = serializer;
+        this.parentPath = parentPath;
+    }
+
+    @Override
+    public void setBlockSize(Long blockSize) {
+        if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
+            this.blockSize = blockSize;
+        }
     }
 
     @Override
     public void write(IMapFileData data) throws IOException {
-        byte[] bytes = serializer.serialize(data);
-        this.write(bytes);
+        ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new 
ReentrantLock());
+        try {
+            lock.lock();
+            writeInternal(data);
+        } finally {
+            lock.unlock();
+        }
     }
 
-    public void flush() throws IOException {
-        // hsync to flag
-        if (out instanceof HdfsDataOutputStream) {
-            ((HdfsDataOutputStream) out)
-                    
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+    public void writeInternal(IMapFileData data) throws IOException {
+        List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath, 
FILE_NAME);
+        SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs, 
dataFiles);
+        // reset block info
+        reset();
+
+        // write data
+        writeEntry(serializer.serialize(data));
+        byte[] bytes;
+        boolean encountered = false;
+        while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+            IMapFileData diskData = serializer.deserialize(bytes, 
IMapFileData.class);
+
+            if (encountered) writeEntry(serializer.serialize(diskData));
+            else if (isKeyEquals(data, diskData))
+                encountered = true; // if current data is the entry which have 
be updated
+            else writeEntry(serializer.serialize(diskData));
         }
-        if (out.getWrappedStream() instanceof DFSOutputStream) {
-            ((DFSOutputStream) out.getWrappedStream())
-                    
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-        } else {
-            out.hsync();
+        stream.close();
+        commit(dataFiles);
+    }
+
+    public void commit(List<String> filenames) throws IOException {

Review Comment:
   ok. the logic is same in cloud and hdfs writer, I only wrote two copies so 
that if there are errors in the integration test, they can be modified 
separately



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to