prclin commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2194492619


##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) {
     // TODO Synchronous write, asynchronous write can be added in the future
     @Override
     public void write(IMapFileData data) throws IOException {
-        byte[] bytes = serializer.serialize(data);
-        this.write(bytes);
+        ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new 
ReentrantLock());
+        try {
+            lock.lock();
+            writeInternal(data);
+        } finally {
+            lock.unlock();
+        }
     }
 
-    private void write(byte[] bytes) {
-        try (FSDataOutputStream out = fs.create(path, true)) {
-            // Write to bytebuffer
-            byte[] data = WALDataUtils.wrapperBytes(bytes);
-            bf.writeBytes(data);
-
-            // Read all bytes
-            byte[] allBytes = new byte[bf.readableBytes()];
-            bf.readBytes(allBytes);
-
-            // write filesystem
-            out.write(allBytes);
+    public void writeInternal(IMapFileData data) throws IOException {
+        List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath, 
FILE_NAME);
+        SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs, 
dataFiles);
+        // reset block info
+        reset();
+
+        // write data
+        writeEntry(serializer.serialize(data));
+        byte[] bytes;
+        boolean encountered = false;
+        while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+            IMapFileData diskData = serializer.deserialize(bytes, 
IMapFileData.class);
+
+            if (encountered) writeEntry(serializer.serialize(diskData));
+            else if (isKeyEquals(data, diskData))
+                encountered = true; // if current data is the entry which have 
be updated
+            else writeEntry(serializer.serialize(diskData));
+        }
+        stream.close();
+        commit(dataFiles);

Review Comment:
   To replace the old key value, it is necessary to traverse the previous file, 
and this solution is based on the previous design; If not replaced and only 
incrementally written, the imap file will expand infinitely; Another solution 
is to have one file for each pair of key values.  then when storing, just find 
the corresponding file and overwrite it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to