prclin commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2182808994
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) {
// TODO Synchronous write, asynchronous write can be added in the future
@Override
public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new
ReentrantLock());
Review Comment:
when using synchronized, in integration test
`org.apache.seatunnel.engine.e2e.SplitClusterFaultToleranceIT#testStreamJobRestoreInAllNodeDown`
, there are concurrent operation on one IMap,but synchronized not worked; and
cause error:

> this image from
https://github.com/prclin/seatunnel/actions/runs/16017281670/job/45186339029
in the begining, i thought it perhaps because Disruptor
`handleEventsWithWorkerPool` concurrently consum events(it means handlers pass
to handleEventsWithWorkerPool were just protorype), but i look into source
codes, it did not do that way.
so i guess FileMapStore of eache IMap may not created just once; in that
case instance level of lock did not work as excepted; so i can just use class
level lock for each IMap.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]