prclin commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2194482125
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/HdfsWriter.java:
##########
@@ -47,41 +69,153 @@ public String identifier() {
@Override
public void initialize(FileSystem fs, Path parentPath, Serializer
serializer)
throws IOException {
- Path path = new Path(parentPath, FILE_NAME);
- this.out = fs.create(path);
+ this.fs = fs;
this.serializer = serializer;
+ this.parentPath = parentPath;
+ }
+
+ @Override
+ public void setBlockSize(Long blockSize) {
+ if (blockSize != null && blockSize > DEFAULT_BLOCK_SIZE) {
+ this.blockSize = blockSize;
+ }
}
@Override
public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new
ReentrantLock());
+ try {
+ lock.lock();
+ writeInternal(data);
+ } finally {
+ lock.unlock();
+ }
}
- public void flush() throws IOException {
- // hsync to flag
- if (out instanceof HdfsDataOutputStream) {
- ((HdfsDataOutputStream) out)
-
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
+ public void writeInternal(IMapFileData data) throws IOException {
+ List<String> dataFiles = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
+ SequenceInputStream stream = WALDataUtils.getComposedInputStream(fs,
dataFiles);
+ // reset block info
+ reset();
+
+ // write data
+ writeEntry(serializer.serialize(data));
+ byte[] bytes;
+ boolean encountered = false;
+ while ((bytes = WALDataUtils.readNextData(stream)) != null) {
+ IMapFileData diskData = serializer.deserialize(bytes,
IMapFileData.class);
+
+ if (encountered) writeEntry(serializer.serialize(diskData));
+ else if (isKeyEquals(data, diskData))
+ encountered = true; // if current data is the entry which have
be updated
+ else writeEntry(serializer.serialize(diskData));
}
- if (out.getWrappedStream() instanceof DFSOutputStream) {
- ((DFSOutputStream) out.getWrappedStream())
-
.hsync(EnumSet.of(HdfsDataOutputStream.SyncFlag.UPDATE_LENGTH));
- } else {
- out.hsync();
+ stream.close();
+ commit(dataFiles);
+ }
+
+ public void commit(List<String> filenames) throws IOException {
Review Comment:
ok. the logic is same in cloud and hdfs writer, I only wrote two copies so
that if there are errors in the integration test, they can be modified
separately
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]