This is an automated email from the ASF dual-hosted git repository.

rong pushed a commit to branch cascade-tsfile-trans
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 91eec524ffea0058fb26e93178e2fcc5bfc8ae71
Author: Steve Yurong Su <[email protected]>
AuthorDate: Sun Aug 6 18:30:51 2023 +0800

    cascade
---
 .../planner/plan/node/load/LoadSingleTsFileNode.java   | 11 ++++++++---
 .../iotdb/db/storageengine/dataregion/DataRegion.java  |  5 +++++
 .../org/apache/iotdb/db/utils/FileLoaderUtils.java     |  2 ++
 .../dataregion/TsFileResourceProgressIndexTest.java    | 18 ++++++++++++++++++
 4 files changed, 33 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
index d536a357412..4f3506bcb75 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/load/LoadSingleTsFileNode.java
@@ -24,6 +24,7 @@ import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TRegionReplicaSet;
 import org.apache.iotdb.common.rpc.thrift.TTimePartitionSlot;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.plan.analyze.Analysis;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -99,9 +100,13 @@ public class LoadSingleTsFileNode extends WritePlanNode {
       needDecodeTsFile = !isDispatchedToLocal(new 
HashSet<>(partitionFetcher.apply(slotList)));
     }
 
-    if (!needDecodeTsFile && !resource.resourceFileExists()) {
-      resource.serialize();
-    }
+    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
+
+    // we serialize the resource file even if the tsfile does not need to be 
decoded
+    // or the resource file is already existed because we need to serialize the
+    // progress index of the tsfile
+    resource.serialize();
+
     return needDecodeTsFile;
   }
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index db5d0168d20..0011db21534 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -47,6 +47,7 @@ import 
org.apache.iotdb.db.exception.WriteProcessRejectException;
 import org.apache.iotdb.db.exception.query.OutOfTTLException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.exception.quota.ExceedQuotaException;
+import 
org.apache.iotdb.db.pipe.extractor.realtime.listener.PipeInsertionDataNodeListener;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.QueryResourceMetricSet;
 import 
org.apache.iotdb.db.queryengine.plan.analyze.cache.schema.DataNodeSchemaCache;
@@ -2221,6 +2222,9 @@ public class DataRegion implements IDataRegionForQuery {
       }
       loadTsFileToUnSequence(
           tsfileToBeInserted, newTsFileResource, newFilePartitionId, 
deleteOriginFile);
+
+      PipeInsertionDataNodeListener.getInstance().listenToTsFile(dataRegionId, 
newTsFileResource);
+
       FileMetrics.getInstance()
           .addFile(
               newTsFileResource.getTsFile().length(),
@@ -2429,6 +2433,7 @@ public class DataRegion implements IDataRegionForQuery {
       } else {
         Files.copy(resourceFileToLoad.toPath(), targetResourceFile.toPath());
       }
+
     } catch (IOException e) {
       logger.error(
           "File renaming failed when loading .resource file. Origin: {}, 
Target: {}",
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
index 28bb8f82930..2f243b2b508 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/utils/FileLoaderUtils.java
@@ -21,6 +21,7 @@ package org.apache.iotdb.db.utils;
 
 import org.apache.iotdb.commons.path.AlignedPath;
 import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.db.pipe.agent.PipeAgent;
 import org.apache.iotdb.db.queryengine.execution.fragment.QueryContext;
 import org.apache.iotdb.db.queryengine.metric.SeriesScanCostMetricSet;
 import org.apache.iotdb.db.storageengine.buffer.TimeSeriesMetadataCache;
@@ -106,6 +107,7 @@ public class FileLoaderUtils {
       }
     }
     resource.setStatus(TsFileResourceStatus.NORMAL);
+    PipeAgent.runtime().assignRecoverProgressIndexForTsFileRecovery(resource);
     return resource;
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
index c5339c1b738..055d928bcf4 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/dataregion/TsFileResourceProgressIndexTest.java
@@ -21,6 +21,9 @@ package org.apache.iotdb.db.storageengine.dataregion;
 
 import org.apache.iotdb.commons.consensus.index.ProgressIndex;
 import org.apache.iotdb.commons.consensus.index.ProgressIndexType;
+import org.apache.iotdb.commons.consensus.index.impl.HybridProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.RecoverProgressIndex;
+import org.apache.iotdb.commons.consensus.index.impl.SimpleProgressIndex;
 import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
 import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
@@ -87,6 +90,21 @@ public class TsFileResourceProgressIndexTest {
 
   @Test
   public void testProgressIndexRecorder() {
+    HybridProgressIndex hybridProgressIndex = new HybridProgressIndex();
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new 
SimpleProgressIndex(3, 4));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(new 
SimpleProgressIndex(6, 6));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 2)));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+        new RecoverProgressIndex(1, new SimpleProgressIndex(1, 3)));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+        new RecoverProgressIndex(2, new SimpleProgressIndex(4, 3)));
+    hybridProgressIndex.updateToMinimumIsAfterProgressIndex(
+        new RecoverProgressIndex(3, new SimpleProgressIndex(5, 3)));
+    Assert.assertTrue(hybridProgressIndex.isAfter(new SimpleProgressIndex(6, 
5)));
+    Assert.assertTrue(
+        hybridProgressIndex.isAfter(new RecoverProgressIndex(3, new 
SimpleProgressIndex(5, 4))));
+
     Assert.assertTrue(
         new 
MockProgressIndex(0).isAfter(tsFileResource.getMaxProgressIndexAfterClose()));
 

Reply via email to