Copilot commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2178852988


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java:
##########
@@ -90,17 +88,16 @@ void testAll() {
                 STORAGE.store(key2Index, keyValue);
                 keys.add(key2Index);
                 value = keyValue;
-                new Thread(() -> 
dataSize.set(STORAGE.loadAll().size())).start();
             }
             STORAGE.store(key, value);
             keys.add(key);
             STORAGE.delete(key1Index);
             keys.remove(key1Index);
         }
 
-        await().atMost(1, TimeUnit.SECONDS).until(dataSize::get, size -> size 
> 0);
+        await().atMost(5, TimeUnit.SECONDS);

Review Comment:
   The `await()` call has no `until(...)` condition, so it returns immediately. 
Add an `until` clause (e.g., polling `STORAGE.loadAll().size()`) or remove the 
unused await to ensure the test actually waits.
   ```suggestion
           await().atMost(5, TimeUnit.SECONDS).until(() -> 
STORAGE.loadAll().size() == 99);
   ```



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -75,49 +68,120 @@ public void setBlockSize(Long blockSize) {
 
     // TODO Synchronous write, asynchronous write can be added in the future
     @Override
-    public void write(IMapFileData data) throws IOException {
-        byte[] bytes = serializer.serialize(data);
-        this.write(bytes);
+    public synchronized void write(IMapFileData data) throws IOException {
+        List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath, 
FILE_NAME);
+        SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs, 
dataFiles);
+        // reset block info
+        reset();
+
+        // write data
+        writeEntry(serializer.serialize(data));
+        byte[] bytes;
+        boolean encountered = false;
+        while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+            IMapFileData diskData = serializer.deserialize(bytes, 
IMapFileData.class);
+
+            if (encountered) writeEntry(serializer.serialize(diskData));
+            else if (isKeyEquals(data, diskData))
+                encountered = true; // if current data is the entry which have 
be updated
+            else writeEntry(serializer.serialize(diskData));
+        }
+        stream.close();
+        commit(dataFiles);

Review Comment:
   Similar to `HdfsWriter`, this method reconstructs the entire WAL for each 
write, which can severely impact performance. An append-only or differential 
update mechanism could dramatically reduce write amplification.
   ```suggestion
           // Append the new data entry to the WAL
           byte[] serializedData = serializer.serialize(data);
           writeEntry(serializedData);
   
           // Mark outdated entries as invalid (if applicable)
           List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath, 
FILE_NAME);
           for (String file : dataFiles) {
               try (SequenceInputStream stream = 
WALDataUtils.getComposedInputStream(fs, List.of(file))) {
                   byte[] bytes;
                   while ((bytes = WALDataUtils.readNextData(stream)) != null) {
                       IMapFileData diskData = serializer.deserialize(bytes, 
IMapFileData.class);
                       if (isKeyEquals(data, diskData)) {
                           markAsInvalid(file, diskData);
                           break;
                       }
                   }
               }
           }
   ```



##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -47,41 +61,132 @@ public String identifier() {
     @Override
     public void initialize(FileSystem fs, Path parentPath, Serializer 
serializer)
             throws IOException {
-        Path path = new Path(parentPath, FILE_NAME);
-        this.out = fs.create(path);
+        this.fs = fs;
         this.serializer = serializer;
+        this.parentPath = parentPath;
     }
 
     @Override
-    public void write(IMapFileData data) throws IOException {
-        byte[] bytes = serializer.serialize(data);
-        this.write(bytes);
+    public void setBlockSize(Long blockSize) {
+        if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
+            this.blockSize = blockSize;
+        }
     }
 
-    public void flush() throws IOException {
-        // hsync to flag
-        if (out instanceof HdfsDataOutputStream) {
-            ((HdfsDataOutputStream) out)
-                    
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+    @Override
+    public synchronized void write(IMapFileData data) throws IOException {
+        List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath, 
FILE_NAME);
+        SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs, 
dataFiles);
+        // reset block info
+        reset();
+
+        // write data
+        writeEntry(serializer.serialize(data));
+        byte[] bytes;
+        boolean encountered = false;
+        while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+            IMapFileData diskData = serializer.deserialize(bytes, 
IMapFileData.class);
+
+            if (encountered) writeEntry(serializer.serialize(diskData));
+            else if (isKeyEquals(data, diskData))
+                encountered = true; // if current data is the entry which have 
be updated
+            else writeEntry(serializer.serialize(diskData));
         }
-        if (out.getWrappedStream() instanceof DFSOutputStream) {
-            ((DFSOutputStream) out.getWrappedStream())
-                    
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
-        } else {
-            out.hsync();
+        stream.close();
+        commit(dataFiles);

Review Comment:
   This implementation reads and rewrites the entire WAL on each write, 
resulting in O(n²) I/O for large datasets. Consider using an append-only 
strategy or an indexed update approach to avoid reprocessing all entries on 
every write.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to