This is an automated email from the ASF dual-hosted git repository.

jiangtian pushed a commit to branch load_v2
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/load_v2 by this push:
     new 445fe8796cc add LoadTsFileManagerTest
445fe8796cc is described below

commit 445fe8796cca6c9f2ba801890ac9f9f6b1a9d2b1
Author: Tian Jiang <[email protected]>
AuthorDate: Wed Sep 27 10:41:33 2023 +0800

    add LoadTsFileManagerTest
---
 .../execution/load/LoadTsFileManager.java          |  16 +-
 .../execution/load/LoadTsFileManagerTest.java      | 196 +++++++++++++++++++++
 .../db/queryengine/execution/load/TestBase.java    |  52 ++++--
 .../execution/load/TsFileSplitSenderTest.java      |   1 +
 4 files changed, 248 insertions(+), 17 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
index 97542ceab24..3768da85af1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManager.java
@@ -19,6 +19,8 @@
 
 package org.apache.iotdb.db.queryengine.execution.load;
 
+import java.util.HashSet;
+import java.util.Set;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.conf.IoTDBConstant;
@@ -195,12 +197,14 @@ public class LoadTsFileManager {
     private Map<DataPartitionInfo, TsFileIOWriter> dataPartition2Writer;
     private Map<DataPartitionInfo, String> dataPartition2LastDevice;
     private boolean isClosed;
+    private Set<Integer> receivedSplitIds;
 
     private TsFileWriterManager(File taskDir) {
       this.taskDir = taskDir;
       this.dataPartition2Writer = new HashMap<>();
       this.dataPartition2LastDevice = new HashMap<>();
       this.isClosed = false;
+      this.receivedSplitIds = new HashSet<>();
 
       clearDir(taskDir);
     }
@@ -215,7 +219,10 @@ public class LoadTsFileManager {
     }
 
     @SuppressWarnings("squid:S3824")
-    private void write(DataPartitionInfo partitionInfo, ChunkData chunkData) 
throws IOException {
+    private synchronized void write(DataPartitionInfo partitionInfo, ChunkData 
chunkData) throws IOException {
+      if (receivedSplitIds.contains(chunkData.getSplitId())) {
+        return;
+      }
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
@@ -239,15 +246,20 @@ public class LoadTsFileManager {
         dataPartition2LastDevice.put(partitionInfo, chunkData.getDevice());
       }
       chunkData.writeToFileWriter(writer);
+      receivedSplitIds.add(chunkData.getSplitId());
     }
 
-    private void writeDeletion(TsFileData deletionData) throws IOException {
+    private synchronized void writeDeletion(TsFileData deletionData) throws 
IOException {
+      if (receivedSplitIds.contains(deletionData.getSplitId())) {
+        return;
+      }
       if (isClosed) {
         throw new 
IOException(String.format(MESSAGE_WRITER_MANAGER_HAS_BEEN_CLOSED, taskDir));
       }
       for (Map.Entry<DataPartitionInfo, TsFileIOWriter> entry : 
dataPartition2Writer.entrySet()) {
         deletionData.writeToFileWriter(entry.getValue());
       }
+      receivedSplitIds.add(deletionData.getSplitId());
     }
 
     private void loadAll(boolean isGeneratedByPipe) throws IOException, 
LoadFileException {
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
new file mode 100644
index 00000000000..97e98c0ce4e
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/LoadTsFileManagerTest.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.load;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Collections;
+import java.util.List;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
+import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId;
+import org.apache.iotdb.commons.consensus.ConsensusGroupId.Factory;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.LoadFileException;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeType;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFileNode;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.load.LoadTsFilePieceNode;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import org.apache.iotdb.db.utils.TimePartitionUtils;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadCommandReq;
+import org.apache.iotdb.mpp.rpc.thrift.TLoadResp;
+import org.apache.iotdb.mpp.rpc.thrift.TTsFilePieceReq;
+import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.compress.IUnCompressor;
+import org.apache.iotdb.tsfile.exception.write.WriteProcessException;
+import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
+import org.apache.iotdb.tsfile.read.TsFileReader;
+import org.apache.iotdb.tsfile.read.TsFileSequenceReader;
+import org.apache.iotdb.tsfile.read.common.Path;
+import org.apache.iotdb.tsfile.read.common.RowRecord;
+import org.apache.iotdb.tsfile.read.expression.QueryExpression;
+import org.apache.iotdb.tsfile.read.query.dataset.QueryDataSet;
+import org.apache.thrift.TException;
+import org.junit.Test;
+
+public class LoadTsFileManagerTest extends TestBase {
+
+  private LoadTsFileManager loadTsFileManager = new LoadTsFileManager();
+  private long maxSplitSize = 128 * 1024 * 1024;
+
+  @Override
+  public void setup() throws IOException, WriteProcessException, 
DataRegionException {
+    fileNum = 10;
+    seriesNum = 100;
+    deviceNum = 100;
+    super.setup();
+  }
+
+  @Test
+  public void test() throws IOException {
+
+    LoadTsFileNode loadTsFileNode =
+        new LoadTsFileNode(new PlanNodeId("testPlanNode"), tsFileResources);
+    DataPartitionBatchFetcher partitionBatchFetcher = 
dummyDataPartitionBatchFetcher();
+    TsFileSplitSender splitSender =
+        new TsFileSplitSender(
+            loadTsFileNode,
+            partitionBatchFetcher,
+            TimePartitionUtils.getTimePartitionInterval(),
+            internalServiceClientManager,
+            false,
+            maxSplitSize,
+            100);
+    long start = System.currentTimeMillis();
+    splitSender.start();
+    long timeConsumption = System.currentTimeMillis() - start;
+
+    System.out.printf("Split ends after %dms\n", timeConsumption);
+
+    ConsensusGroupId d1GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
+    DataRegion dataRegion = 
dataRegionMap.get(d1GroupId.convertToTConsensusGroupId());
+    List<TsFileResource> tsFileList = 
dataRegion.getTsFileManager().getTsFileList(false);
+    System.out.printf("Loaded TsFiles: %s\n", tsFileList);
+    assertEquals(1, tsFileList.size());
+
+    long timePartitionInterval = TimePartitionUtils.getTimePartitionInterval();
+    long chunkTimeRange = (long) (timePartitionInterval * chunkTimeRangeRatio);
+    int chunkPointNum = (int) (chunkTimeRange / pointInterval);
+    long endTime = chunkTimeRange * (fileNum - 1) + pointInterval * 
(chunkPointNum - 1);
+
+    TsFileResource tsFileResource = tsFileList.get(0);
+    for (int i = 0; i < deviceNum; i++) {
+      assertEquals(0, tsFileResource.getStartTime("d" + i));
+      assertEquals(endTime, tsFileResource.getEndTime("d" + i));
+    }
+
+    try (TsFileReader reader = new TsFileReader(
+        new TsFileSequenceReader(tsFileResource.getTsFile().getPath()))) {
+      for (int dn = 0; dn < deviceNum; dn++) {
+        QueryExpression queryExpression = QueryExpression.create(
+            Collections.singletonList(new Path("d" + dn, "Simple_22", false)), 
null);
+        QueryDataSet dataSet = reader.query(queryExpression);
+        int i = 0;
+        while (dataSet.hasNext()) {
+          RowRecord record = dataSet.next();
+          long currTime =
+              chunkTimeRange * (i / chunkPointNum) + pointInterval * (i % 
chunkPointNum);
+          assertEquals(currTime, record.getTimestamp());
+          assertEquals(1.0 * (i % chunkPointNum), 
record.getFields().get(0).getDoubleV(), 0.0001);
+          i++;
+        }
+        assertEquals(chunkPointNum * fileNum, i);
+      }
+    }
+  }
+
+  public TLoadResp handleTsFilePieceNode(TTsFilePieceReq req, TEndPoint 
tEndpoint)
+      throws TException, IOException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+    ByteBuffer buf = req.body.slice();
+    if (req.isSetCompressionType()) {
+      CompressionType compressionType = 
CompressionType.deserialize(req.compressionType);
+      IUnCompressor unCompressor = 
IUnCompressor.getUnCompressor(compressionType);
+      int uncompressedLength = req.getUncompressedLength();
+      ByteBuffer allocate = ByteBuffer.allocate(uncompressedLength);
+      unCompressor.uncompress(buf.array(), buf.arrayOffset() + buf.position(), 
buf.remaining(),
+          allocate.array(), 0);
+      allocate.limit(uncompressedLength);
+      buf = allocate;
+    }
+
+    LoadTsFilePieceNode pieceNode = (LoadTsFilePieceNode) 
PlanNodeType.deserialize(buf);
+    
loadTsFileManager.writeToDataRegion(dataRegionMap.get(req.consensusGroupId), 
pieceNode,
+        req.uuid);
+
+    // forward to other replicas in the group
+    if (req.isRelay) {
+      req.isRelay = false;
+      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+      
regionReplicaSet.getDataNodeLocations().stream().parallel().forEach(dataNodeLocation
 -> {
+        TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+        if (!otherPoint.equals(tEndpoint)) {
+          try {
+            handleTsFilePieceNode(req, otherPoint);
+          } catch (TException | IOException e) {
+            throw new RuntimeException(e);
+          }
+        }
+      });
+    }
+
+    return new TLoadResp()
+        .setAccepted(true)
+        .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint)
+      throws LoadFileException, IOException {
+    ConsensusGroupId groupId =
+        
ConsensusGroupId.Factory.createFromTConsensusGroupId(req.consensusGroupId);
+
+    // forward to other replicas in the group
+    if (req.useConsensus) {
+      req.useConsensus = false;
+      TRegionReplicaSet regionReplicaSet = groupId2ReplicaSetMap.get(groupId);
+      for (TDataNodeLocation dataNodeLocation : 
regionReplicaSet.getDataNodeLocations()) {
+        TEndPoint otherPoint = dataNodeLocation.getInternalEndPoint();
+        if (!otherPoint.equals(tEndpoint)) {
+          handleTsLoadCommand(req, otherPoint);
+        }
+      }
+    }
+
+    loadTsFileManager.loadAll(req.uuid, false);
+
+    return new TLoadResp()
+        .setAccepted(true)
+        .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
+  }
+
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
index fab94c1751b..a8e7ee3b2bf 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TestBase.java
@@ -22,7 +22,9 @@ package org.apache.iotdb.db.queryengine.execution.load;
 import static org.apache.iotdb.commons.conf.IoTDBConstant.GB;
 
 import java.util.Comparator;
+import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicLong;
+import org.apache.iotdb.common.rpc.thrift.TConsensusGroupId;
 import org.apache.iotdb.common.rpc.thrift.TConsensusGroupType;
 import org.apache.iotdb.common.rpc.thrift.TDataNodeLocation;
 import org.apache.iotdb.common.rpc.thrift.TEndPoint;
@@ -41,8 +43,15 @@ import 
org.apache.iotdb.commons.partition.DataPartitionQueryParam;
 import org.apache.iotdb.commons.partition.SchemaNodeManagementPartition;
 import org.apache.iotdb.commons.partition.SchemaPartition;
 import org.apache.iotdb.commons.path.PathPatternTree;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.db.exception.DataRegionException;
+import org.apache.iotdb.db.exception.LoadFileException;
 import org.apache.iotdb.db.queryengine.plan.analyze.IPartitionFetcher;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
+import org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy;
+import 
org.apache.iotdb.db.storageengine.dataregion.flush.TsFileFlushPolicy.DirectFlushPolicy;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
+import 
org.apache.iotdb.db.storageengine.dataregion.wal.recover.WALRecoverManager;
 import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGenerator;
 import org.apache.iotdb.db.utils.SequenceUtils.DoubleSequenceGeneratorFactory;
 import org.apache.iotdb.db.utils.SequenceUtils.GaussianDoubleSequenceGenerator;
@@ -91,11 +100,11 @@ import java.util.stream.IntStream;
 public class TestBase {
 
   private static final Logger logger = LoggerFactory.getLogger(TestBase.class);
-  public static final String BASE_OUTPUT_PATH = 
"target".concat(File.separator);
+  public static final String BASE_OUTPUT_PATH = 
"target".concat(File.separator).concat("loadTest");
   public static final String PARTIAL_PATH_STRING =
       "%s" + File.separator + "%d" + File.separator + "%d" + File.separator;
   public static final String TEST_TSFILE_PATH =
-      BASE_OUTPUT_PATH + "testTsFile".concat(File.separator) + 
PARTIAL_PATH_STRING;
+      BASE_OUTPUT_PATH + File.separator + "testTsFile".concat(File.separator) 
+ PARTIAL_PATH_STRING;
 
   protected int fileNum = 100;
   // series number of each file, sn non-aligned series and 1 aligned series 
with sn measurements
@@ -117,6 +126,7 @@ public class TestBase {
   protected IPartitionFetcher partitionFetcher;
   // the key is deviceId, not partitioned by time in the simple test
   protected Map<String, TRegionReplicaSet> partitionTable = new HashMap<>();
+  protected Map<TConsensusGroupId, DataRegion> dataRegionMap = new HashMap<>();
   protected Map<ConsensusGroupId, TRegionReplicaSet> groupId2ReplicaSetMap = 
new HashMap<>();
   protected IClientManager<TEndPoint, SyncDataNodeInternalServiceClient>
       internalServiceClientManager;
@@ -125,7 +135,7 @@ public class TestBase {
   private int groupSizeInByte;
 
   @Before
-  public void setup() throws IOException, WriteProcessException {
+  public void setup() throws IOException, WriteProcessException, 
DataRegionException {
     setupFiles();
     logger.info("{} files set up", files.size());
     partitionFetcher = dummyPartitionFetcher();
@@ -137,9 +147,8 @@ public class TestBase {
 
   @After
   public void cleanup() {
-    for (File file : files) {
-      file.delete();
-    }
+    FileUtils.deleteDirectory(new File(BASE_OUTPUT_PATH));
+    FileUtils.deleteDirectory(new File("target" + File.separator + "data"));
     
TSFileDescriptor.getInstance().getConfig().setGroupSizeInByte(groupSizeInByte);
   }
 
@@ -165,7 +174,8 @@ public class TestBase {
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
   }
 
-  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint) {
+  public TLoadResp handleTsLoadCommand(TLoadCommandReq req, TEndPoint 
tEndpoint)
+      throws LoadFileException, IOException {
     return new TLoadResp()
         .setAccepted(true)
         .setStatus(new 
TSStatus().setCode(TSStatusCode.SUCCESS_STATUS.getStatusCode()));
@@ -195,8 +205,12 @@ public class TestBase {
                 }
 
                 @Override
-                public TLoadResp sendLoadCommand(TLoadCommandReq req) {
-                  return handleTsLoadCommand(req, getTEndpoint());
+                public TLoadResp sendLoadCommand(TLoadCommandReq req) throws 
TException {
+                  try {
+                    return handleTsLoadCommand(req, getTEndpoint());
+                  } catch (LoadFileException | IOException e) {
+                    throw new TException(e);
+                  }
                 }
 
                 @Override
@@ -218,7 +232,7 @@ public class TestBase {
         };
   }
 
-  public void setupPartitionTable() {
+  public void setupPartitionTable() throws DataRegionException {
     ConsensusGroupId d1GroupId = 
Factory.create(TConsensusGroupType.DataRegion.getValue(), 0);
     TRegionReplicaSet d1Replicas =
         new TRegionReplicaSet(
@@ -233,8 +247,14 @@ public class TestBase {
                 new TDataNodeLocation()
                     .setDataNodeId(2)
                     .setInternalEndPoint(new TEndPoint("localhost", 10002))));
+
+    WALRecoverManager.getInstance()
+        .setAllDataRegionScannedLatch(new CountDownLatch(0));
+    DataRegion dataRegion = new DataRegion(BASE_OUTPUT_PATH, 
d1GroupId.toString(),
+        new DirectFlushPolicy(), "root.loadTest");
     for (int i = 0; i < deviceNum; i++) {
       partitionTable.put("d" + i, d1Replicas);
+      dataRegionMap.put(d1GroupId.convertToTConsensusGroupId(), dataRegion);
     }
 
     groupId2ReplicaSetMap.put(d1GroupId, d1Replicas);
@@ -334,8 +354,9 @@ public class TestBase {
       schemaGeneratorPairs.add(new Pair<>(measurementSchema, 
measurementGeneratorPair.right));
     }
     schemaGeneratorPairs.sort(Comparator.comparing(s -> 
s.left.getMeasurementId()));
-    List<MeasurementSchema> measurementSchemas = 
schemaGeneratorPairs.stream().map(m -> m.left).collect(
-        Collectors.toList());
+    List<MeasurementSchema> measurementSchemas = 
schemaGeneratorPairs.stream().map(m -> m.left)
+        .collect(
+            Collectors.toList());
     IntStream.range(0, fileNum)
         .parallel()
         .forEach(
@@ -352,7 +373,8 @@ public class TestBase {
                   // dd2
                   for (int sn = 0; sn < seriesNum; sn++) {
                     for (int dn = 0; dn < deviceNum; dn++) {
-                      writer.registerTimeseries(new Path("d"+dn), 
schemaGeneratorPairs.get(sn).left);
+                      writer.registerTimeseries(new Path("d" + dn),
+                          schemaGeneratorPairs.get(sn).left);
                     }
                   }
                   writer.registerAlignedTimeseries(new Path("dd1"), 
measurementSchemas);
@@ -387,8 +409,8 @@ public class TestBase {
 
                   writer.flushAllChunkGroups();
                   for (int dn = 0; dn < deviceNum; dn++) {
-                    tsFileResource.updateStartTime("d"+dn, chunkTimeRange * i);
-                    tsFileResource.updateEndTime("d"+dn, chunkTimeRange * (i + 
1));
+                    tsFileResource.updateStartTime("d" + dn, chunkTimeRange * 
i);
+                    tsFileResource.updateEndTime("d" + dn, chunkTimeRange * (i 
+ 1));
                   }
                 }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
index 818e3f80d25..a1509216852 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/load/TsFileSplitSenderTest.java
@@ -120,6 +120,7 @@ public class TsFileSplitSenderTest extends TestBase {
     long start = System.currentTimeMillis();
     splitSender.start();
     long timeConsumption = System.currentTimeMillis() - start;
+    thread.interrupt();
 
     printPhaseResult();
     long transmissionTime = splitSender.getStatistic().compressedSize.get() / 
nodeThroughput;

Reply via email to