Copilot commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2178852988
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorageTest.java:
##########
@@ -90,17 +88,16 @@ void testAll() {
STORAGE.store(key2Index, keyValue);
keys.add(key2Index);
value = keyValue;
- new Thread(() ->
dataSize.set(STORAGE.loadAll().size())).start();
}
STORAGE.store(key, value);
keys.add(key);
STORAGE.delete(key1Index);
keys.remove(key1Index);
}
- await().atMost(1, TimeUnit.SECONDS).until(dataSize::get, size -> size
> 0);
+ await().atMost(5, TimeUnit.SECONDS);
Review Comment:
The `await()` call has no `until(...)` condition, so it returns immediately.
Add an `until` clause (e.g., polling `STORAGE.loadAll().size()`) or remove the
unused await to ensure the test actually waits.
```suggestion
await().atMost(5, TimeUnit.SECONDS).until(() ->
STORAGE.loadAll().size() == 99);
```
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -75,49 +68,120 @@ public void setBlockSize(Long blockSize) {
// TODO Synchronous write, asynchronous write can be added in the future
@Override
- public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ public synchronized void write(IMapFileData data) throws IOException {
+ List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
+ SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs,
dataFiles);
+ // reset block info
+ reset();
+
+ // write data
+ writeEntry(serializer.serialize(data));
+ byte[] bytes;
+ boolean encountered = false;
+ while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+ IMapFileData diskData = serializer.deserialize(bytes,
IMapFileData.class);
+
+ if (encountered) writeEntry(serializer.serialize(diskData));
+ else if (isKeyEquals(data, diskData))
+ encountered = true; // if current data is the entry which have
be updated
+ else writeEntry(serializer.serialize(diskData));
+ }
+ stream.close();
+ commit(dataFiles);
Review Comment:
Similar to `HdfsWriter`, this method reconstructs the entire WAL for each
write, which can severely impact performance. An append-only or differential
update mechanism could dramatically reduce write amplification.
```suggestion
// Append the new data entry to the WAL
byte[] serializedData = serializer.serialize(data);
writeEntry(serializedData);
// Mark outdated entries as invalid (if applicable)
List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
for (String file : dataFiles) {
try (SequenceInputStream stream =
WALDataUtils.getComposedInputStream(fs, List.of(file))) {
byte[] bytes;
while ((bytes = WALDataUtils.readNextData(stream)) != null) {
IMapFileData diskData = serializer.deserialize(bytes,
IMapFileData.class);
if (isKeyEquals(data, diskData)) {
markAsInvalid(file, diskData);
break;
}
}
}
}
```
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -47,41 +61,132 @@ public String identifier() {
@Override
public void initialize(FileSystem fs, Path parentPath, Serializer
serializer)
throws IOException {
- Path path = new Path(parentPath, FILE_NAME);
- this.out = fs.create(path);
+ this.fs = fs;
this.serializer = serializer;
+ this.parentPath = parentPath;
}
@Override
- public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ public void setBlockSize(Long blockSize) {
+ if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
+ this.blockSize = blockSize;
+ }
}
- public void flush() throws IOException {
- // hsync to flag
- if (out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) out)
-
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ @Override
+ public synchronized void write(IMapFileData data) throws IOException {
+ List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
+ SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs,
dataFiles);
+ // reset block info
+ reset();
+
+ // write data
+ writeEntry(serializer.serialize(data));
+ byte[] bytes;
+ boolean encountered = false;
+ while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+ IMapFileData diskData = serializer.deserialize(bytes,
IMapFileData.class);
+
+ if (encountered) writeEntry(serializer.serialize(diskData));
+ else if (isKeyEquals(data, diskData))
+ encountered = true; // if current data is the entry which have
be updated
+ else writeEntry(serializer.serialize(diskData));
}
- if (out.getWrappedStream() instanceof DFSOutputStream) {
- ((DFSOutputStream) out.getWrappedStream())
-
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- out.hsync();
+ stream.close();
+ commit(dataFiles);
Review Comment:
This implementation reads and rewrites the entire WAL on each write,
resulting in O(n²) I/O for large datasets. Consider using an append-only
strategy or an indexed update approach to avoid reprocessing all entries on
every write.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]