Hisoka-X commented on code in PR #9524:
URL: https://github.com/apache/seatunnel/pull/9524#discussion_r2182689749
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java:
##########
@@ -76,48 +77,140 @@ public void setBlockSize(Long blockSize) {
// TODO Synchronous write, asynchronous write can be added in the future
@Override
public void write(IMapFileData data) throws IOException {
- byte[] bytes = serializer.serialize(data);
- this.write(bytes);
+ ReentrantLock lock = LOCK_MAP.computeIfAbsent(parentPath, path -> new
ReentrantLock());
Review Comment:
why not use `synchronized`?
##########
seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/reader/DefaultReader.java:
##########
@@ -55,60 +60,47 @@ public void initialize(FileSystem fs, Serializer
serializer) throws IOException
@Override
public List<IMapFileData> readAllData(Path parentPath) throws IOException {
- List<String> fileNames = getFileNames(parentPath);
+ List<String> fileNames = WALDataUtils.getDataFiles(fs, parentPath,
FILE_NAME);
if (CollectionUtils.isEmpty(fileNames)) {
return new ArrayList<>();
}
- List<IMapFileData> result = new ArrayList<>(DEFAULT_QUERY_LIST_SIZE);
- for (String fileName : fileNames) {
- result.addAll(readData(new Path(parentPath, fileName)));
- }
- return result;
- }
- private List<String> getFileNames(Path parentPath) {
- try {
- if (!fs.exists(parentPath)) {
- return new ArrayList<>();
- }
- RemoteIterator<LocatedFileStatus> fileStatusRemoteIterator =
- fs.listFiles(parentPath, true);
- List<String> fileNames = new ArrayList<>();
- while (fileStatusRemoteIterator.hasNext()) {
- LocatedFileStatus fileStatus = fileStatusRemoteIterator.next();
- if (fileStatus.getPath().getName().endsWith("wal.txt")) {
- fileNames.add(fileStatus.getPath().toString());
- }
- }
- return fileNames;
- } catch (IOException e) {
- throw new IMapStorageException(e, "get file names error,path is
s%", parentPath);
- }
+ List<Path> paths =
+ fileNames.stream()
+ .map(filename -> new Path(parentPath, filename))
+ .collect(Collectors.toList());
+ return readData(paths);
}
- private List<IMapFileData> readData(Path path) throws IOException {
+ public List<IMapFileData> readData(List<Path> paths) throws IOException {
Review Comment:
```suggestion
private List<IMapFileData> readData(List<Path> paths) throws IOException
{
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]