This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 0c29fe5a0c0 Revert "Load: Parallelly load files into different target 
data partitions (#13893)" (#13905)
0c29fe5a0c0 is described below

commit 0c29fe5a0c063578f6940db3bbf7dc9f056a3eb6
Author: Steve Yurong Su <[email protected]>
AuthorDate: Fri Oct 25 10:26:59 2024 +0800

    Revert "Load: Parallelly load files into different target data partitions 
(#13893)" (#13905)
    
    This reverts commit dbb99bc88dea50c6effffae4081abbba1ea5f76a.
---
 .../db/storageengine/load/LoadTsFileManager.java   | 57 +++++++---------------
 1 file changed, 17 insertions(+), 40 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
index f785d0992eb..775dad1df46 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/LoadTsFileManager.java
@@ -61,11 +61,8 @@ import java.io.IOException;
 import java.nio.file.DirectoryNotEmptyException;
 import java.nio.file.Files;
 import java.nio.file.Path;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.Collections;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
@@ -455,47 +452,27 @@ public class LoadTsFileManager {
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
-
       for (Map.Entry<DataPartitionInfo, ModificationFile> entry :
           dataPartition2ModificationFile.entrySet()) {
         entry.getValue().close();
       }
-
-      final List<Map.Entry<DataPartitionInfo, TsFileIOWriter>> 
dataPartition2WriterList =
-          new ArrayList<>(dataPartition2Writer.entrySet());
-      Collections.shuffle(dataPartition2WriterList);
-
-      final AtomicReference<Exception> exception = new AtomicReference<>();
-      dataPartition2WriterList.parallelStream()
-          .forEach(
-              entry -> {
-                try {
-                  final TsFileIOWriter writer = entry.getValue();
-                  if (writer.isWritingChunkGroup()) {
-                    writer.endChunkGroup();
-                  }
-                  writer.endFile();
-
-                  final DataRegion dataRegion = entry.getKey().getDataRegion();
-                  dataRegion.loadNewTsFile(
-                      generateResource(writer, progressIndex), true, 
isGeneratedByPipe);
-
-                  // Metrics
-                  dataRegion
-                      .getNonSystemDatabaseName()
-                      .ifPresent(
-                          databaseName ->
-                              updateWritePointCountMetrics(
-                                  dataRegion,
-                                  databaseName,
-                                  getTsFileWritePointCount(writer),
-                                  false));
-                } catch (final Exception e) {
-                  exception.set(e);
-                }
-              });
-      if (exception.get() != null) {
-        throw new LoadFileException(exception.get());
+      for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
+        TsFileIOWriter writer = entry.getValue();
+        if (writer.isWritingChunkGroup()) {
+          writer.endChunkGroup();
+        }
+        writer.endFile();
+
+        DataRegion dataRegion = entry.getKey().getDataRegion();
+        dataRegion.loadNewTsFile(generateResource(writer, progressIndex), 
true, isGeneratedByPipe);
+
+        // Metrics
+        dataRegion
+            .getNonSystemDatabaseName()
+            .ifPresent(
+                databaseName ->
+                    updateWritePointCountMetrics(
+                        dataRegion, databaseName, 
getTsFileWritePointCount(writer), false));
       }
     }
 

Reply via email to