Hisoka-X commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2194079651
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) {
// TODO Synchronous write, asynchronous write can be added in the future
@Override
public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new
ReentrantLock());
+ try {
+ lock.lock();
+ writeInternal(data);
+ } finally {
+ lock.unlock();
+ }
}
- private void write(byte[] bytes) {
- try (FSDataOutputStream out = fs.create(path, true)) {
- // Write to bytebuffer
- byte[] data = WALDataUtils.wrapperBytes(bytes);
- bf.writeBytes(data);
-
- // Read all bytes
- byte[] allBytes = new byte[bf.readableBytes()];
- bf.readBytes(allBytes);
-
- // write filesystem
- out.write(allBytes);
+ public void writeInternal(IMapFileData data) throws IOException {
+ List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
+ SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs,
dataFiles);
+ // reset block info
+ reset();
+
+ // write data
+ writeEntry(serializer.serialize(data));
+ byte[] bytes;
+ boolean encountered = false;
+ while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+ IMapFileData diskData = serializer.deserialize(bytes,
IMapFileData.class);
+
+ if (encountered) writeEntry(serializer.serialize(diskData));
+ else if (isKeyEquals(data, diskData))
+ encountered = true; // if current data is the entry which have
be updated
+ else writeEntry(serializer.serialize(diskData));
+ }
+ stream.close();
+ commit(dataFiles);
Review Comment:
Do we need to traverse all the data every time we write? This logic has a
performance crisis.
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -47,41 +69,153 @@ public String identifier() {
@Override
public void initialize(FileSystem fs, Path parentPath, Serializer
serializer)
throws IOException {
- Path path = new Path(parentPath, FILE_NAME);
- this.out = fs.create(path);
+ this.fs = fs;
this.serializer = serializer;
+ this.parentPath = parentPath;
+ }
+
+ @Override
+ public void setBlockSize(Long blockSize) {
+ if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
+ this.blockSize = blockSize;
+ }
}
@Override
public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new
ReentrantLock());
+ try {
+ lock.lock();
+ writeInternal(data);
+ } finally {
+ lock.unlock();
+ }
}
- public void flush() throws IOException {
- // hsync to flag
- if (out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) out)
-
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ public void writeInternal(IMapFileData data) throws IOException {
+ List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
+ SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs,
dataFiles);
+ // reset block info
+ reset();
+
+ // write data
+ writeEntry(serializer.serialize(data));
+ byte[] bytes;
+ boolean encountered = false;
+ while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+ IMapFileData diskData = serializer.deserialize(bytes,
IMapFileData.class);
+
+ if (encountered) writeEntry(serializer.serialize(diskData));
+ else if (isKeyEquals(data, diskData))
+ encountered = true; // if current data is the entry which have
be updated
+ else writeEntry(serializer.serialize(diskData));
}
- if (out.getWrappedStream() instanceof DFSOutputStream) {
- ((DFSOutputStream) out.getWrappedStream())
-
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- out.hsync();
+ stream.close();
+ commit(dataFiles);
+ }
+
+ public void commit(List<String> filenames) throws IOException {
Review Comment:
Please do some refactor, I found some duplicate code between ColudWriter and
HdfsWriter.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]