This is an automated email from the ASF dual-hosted git repository.

jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cb97fe44ca6 Load: Optimized the downgraded logic for tsFile to insert 
more data when tsFile corrupted (#17674)
cb97fe44ca6 is described below

commit cb97fe44ca663fd142a0b5a78e4410f18ca724ae
Author: Caideyipi <[email protected]>
AuthorDate: Tue Jun 23 10:13:01 2026 +0800

    Load: Optimized the downgraded logic for tsFile to insert more data when 
tsFile corrupted (#17674)
    
    * down
    
    * Update LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
    
    * Fix query parser deferred tablet failure handling
    
    * Fix aligned load fallback corruption test
    
    * Address load fallback review comments
---
 .../query/TsFileInsertionEventQueryParser.java     | 108 +++-
 .../scan/TsFileInsertionEventScanParser.java       |  57 ++-
 ...eeStatementDataTypeConvertExecutionVisitor.java |  15 +-
 .../converter/LoadTreeTsFileTabletIterator.java    | 557 +++++++++++++++++++++
 ...atementDataTypeConvertExecutionVisitorTest.java | 380 ++++++++++++++
 5 files changed, 1091 insertions(+), 26 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
index 2656ec7d72d..33652f3f3da 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/query/TsFileInsertionEventQueryParser.java
@@ -60,9 +60,9 @@ import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
@@ -81,6 +81,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
   private final Map<IDeviceID, Boolean> deviceIsAlignedMap;
   private final Map<String, TSDataType> measurementDataTypeMap;
   private final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
+  private RuntimeException deferredException;
 
   @TestOnly
   public TsFileInsertionEventQueryParser(
@@ -116,6 +117,37 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
         null,
         false,
         null,
+        null,
+        isWithMod);
+  }
+
+  public TsFileInsertionEventQueryParser(
+      final String pipeName,
+      final long creationTime,
+      final File tsFile,
+      final TreePattern pattern,
+      final long startTime,
+      final long endTime,
+      final PipeTaskMeta pipeTaskMeta,
+      final PipeInsertionEvent sourceEvent,
+      final IAuditEntity entity,
+      final boolean skipIfNoPrivileges,
+      final Map<IDeviceID, Boolean> deviceIsAlignedMap,
+      final boolean isWithMod)
+      throws IOException, IllegalPathException {
+    this(
+        pipeName,
+        creationTime,
+        tsFile,
+        pattern,
+        startTime,
+        endTime,
+        pipeTaskMeta,
+        sourceEvent,
+        entity,
+        skipIfNoPrivileges,
+        deviceIsAlignedMap,
+        null,
         isWithMod);
   }
 
@@ -131,6 +163,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       final IAuditEntity entity,
       final boolean skipIfNoPrivileges,
       final Map<IDeviceID, Boolean> deviceIsAlignedMap,
+      final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
       final boolean isWithMod)
       throws IOException, IllegalPathException {
     super(
@@ -165,7 +198,25 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
       tsFileSequenceReader = new TsFileSequenceReader(tsFile.getPath(), true, 
true);
       tsFileReader = new TsFileReader(tsFileSequenceReader);
 
-      if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
+      if (Objects.nonNull(deviceMeasurementsMapOverride)) {
+        this.deviceIsAlignedMap =
+            Objects.nonNull(deviceIsAlignedMap)
+                ? new LinkedHashMap<>(deviceIsAlignedMap)
+                : readDeviceIsAlignedMap();
+        memoryRequiredInBytes +=
+            Objects.nonNull(deviceIsAlignedMap)
+                ? 0
+                : 
PipeMemoryWeightUtil.memoryOfIDeviceId2Bool(this.deviceIsAlignedMap);
+
+        measurementDataTypeMap =
+            
readFilteredFullPathDataTypeMap(deviceMeasurementsMapOverride.keySet());
+        memoryRequiredInBytes +=
+            
PipeMemoryWeightUtil.memoryOfStr2TSDataType(measurementDataTypeMap);
+
+        deviceMeasurementsMap = new 
LinkedHashMap<>(deviceMeasurementsMapOverride);
+        memoryRequiredInBytes +=
+            
PipeMemoryWeightUtil.memoryOfIDeviceID2StrList(deviceMeasurementsMap);
+      } else if (tsFileResourceManager.cacheObjectsIfAbsent(tsFile)) {
         // These read-only objects can be found in cache.
         this.deviceIsAlignedMap =
             Objects.nonNull(deviceIsAlignedMap)
@@ -266,10 +317,34 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
     }
   }
 
+  public TsFileInsertionEventQueryParser(
+      final File tsFile,
+      final TreePattern pattern,
+      final long startTime,
+      final long endTime,
+      final Map<IDeviceID, List<String>> deviceMeasurementsMapOverride,
+      final boolean isWithMod)
+      throws IOException, IllegalPathException {
+    this(
+        null,
+        0,
+        tsFile,
+        pattern,
+        startTime,
+        endTime,
+        null,
+        null,
+        null,
+        false,
+        null,
+        deviceMeasurementsMapOverride,
+        isWithMod);
+  }
+
   private Map<IDeviceID, List<String>> filterDeviceMeasurementsMapByPattern(
       final Map<IDeviceID, List<String>> originalDeviceMeasurementsMap)
       throws IllegalPathException {
-    final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new 
HashMap<>();
+    final Map<IDeviceID, List<String>> filteredDeviceMeasurementsMap = new 
LinkedHashMap<>();
     for (Map.Entry<IDeviceID, List<String>> entry : 
originalDeviceMeasurementsMap.entrySet()) {
       final IDeviceID deviceId = entry.getKey();
 
@@ -318,7 +393,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
   }
 
   private Map<IDeviceID, Boolean> readDeviceIsAlignedMap() throws IOException {
-    final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new HashMap<>();
+    final Map<IDeviceID, Boolean> deviceIsAlignedResultMap = new 
LinkedHashMap<>();
     final TsFileDeviceIterator deviceIsAlignedIterator =
         tsFileSequenceReader.getAllDevicesIteratorWithIsAligned();
     while (deviceIsAlignedIterator.hasNext()) {
@@ -348,7 +423,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
    */
   private Map<String, TSDataType> readFilteredFullPathDataTypeMap(final 
Set<IDeviceID> devices)
       throws IOException {
-    final Map<String, TSDataType> result = new HashMap<>();
+    final Map<String, TSDataType> result = new LinkedHashMap<>();
 
     for (final IDeviceID device : devices) {
       tsFileSequenceReader
@@ -370,7 +445,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
    */
   private Map<IDeviceID, List<String>> readFilteredDeviceMeasurementsMap(
       final Set<IDeviceID> devices) throws IOException {
-    final Map<IDeviceID, List<String>> result = new HashMap<>();
+    final Map<IDeviceID, List<String>> result = new LinkedHashMap<>();
 
     for (final IDeviceID device : devices) {
       tsFileSequenceReader
@@ -397,6 +472,7 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
 
                 @Override
                 public boolean hasNext() {
+                  throwIfDeferredException();
                   boolean hasNext = false;
                   while (tabletIterator == null || !tabletIterator.hasNext()) {
                     if (!deviceMeasurementsMapIterator.hasNext()) {
@@ -451,9 +527,16 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
                   final boolean isAligned =
                       deviceIsAlignedMap.getOrDefault(
                           
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId()), false);
+                  boolean isLast;
+                  try {
+                    isLast = !hasNext();
+                  } catch (final RuntimeException e) {
+                    deferredException = e;
+                    isLast = false;
+                  }
 
                   final TabletInsertionEvent next;
-                  if (!hasNext()) {
+                  if (isLast) {
                     next =
                         sourceEvent == null
                             ? new PipeRawTabletInsertionEvent(
@@ -517,8 +600,19 @@ public class TsFileInsertionEventQueryParser extends 
TsFileInsertionEventParser
     return tabletInsertionIterable;
   }
 
+  private void throwIfDeferredException() {
+    if (Objects.isNull(deferredException)) {
+      return;
+    }
+
+    final RuntimeException exception = deferredException;
+    deferredException = null;
+    throw exception;
+  }
+
   @Override
   public void close() {
+    deferredException = null;
     try {
       if (tsFileReader != null) {
         tsFileReader.close();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
index d843ab38ab4..4c5cd75d4c1 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/parser/scan/TsFileInsertionEventScanParser.java
@@ -98,6 +98,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
   private boolean currentIsAligned;
   private final List<IMeasurementSchema> currentMeasurements = new 
ArrayList<>();
   private final TabletStringInternPool tabletStringInternPool = new 
TabletStringInternPool();
+  private Exception deferredException;
   private final List<ModsOperationUtil.ModsInfo> modsInfos = new ArrayList<>();
   // Cached time chunk
   private final List<Chunk> timeChunkList = new ArrayList<>();
@@ -204,6 +205,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
                 @Override
                 public boolean hasNext() {
+                  throwIfDeferredException();
                   final boolean hasNext = Objects.nonNull(chunkReader);
                   if (hasNext && !parseStartTimeRecorded) {
                     // Record start time on first hasNext() that returns true
@@ -232,7 +234,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                   final Tablet tablet = getNextTablet();
                   // Record tablet metrics
                   recordTabletMetrics(tablet);
-                  final boolean hasNext = hasNext();
+                  final boolean isLast = 
isLastTabletWithoutDeferredException();
                   try {
                     return sourceEvent == null
                         ? new PipeRawTabletInsertionEvent(
@@ -246,7 +248,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                             0,
                             pipeTaskMeta,
                             sourceEvent,
-                            !hasNext)
+                            isLast)
                         : new PipeRawTabletInsertionEvent(
                             sourceEvent.getRawIsTableModelEvent(),
                             sourceEvent.getSourceDatabaseNameFromDataRegion(),
@@ -258,9 +260,10 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
                             sourceEvent.getCreationTime(),
                             pipeTaskMeta,
                             sourceEvent,
-                            !hasNext);
+                            isLast);
                   } finally {
-                    if (!hasNext) {
+                    if (isLast) {
+                      recordParseEndTimeIfNecessary();
                       close();
                     }
                   }
@@ -275,6 +278,7 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         new Iterator<Pair<Tablet, Boolean>>() {
           @Override
           public boolean hasNext() {
+            throwIfDeferredException();
             return Objects.nonNull(chunkReader);
           }
 
@@ -291,11 +295,10 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
             // information.
             final boolean isAligned = currentIsAligned;
             final Tablet tablet = getNextTablet();
-            final boolean hasNext = hasNext();
             try {
               return new Pair<>(tablet, isAligned);
             } finally {
-              if (!hasNext) {
+              if (isLastTabletWithoutDeferredException()) {
                 close();
               }
             }
@@ -303,6 +306,22 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
         };
   }
 
+  public IDeviceID getCurrentDevice() {
+    return currentDevice;
+  }
+
+  public boolean isCurrentAligned() {
+    return currentIsAligned;
+  }
+
+  public List<String> getCurrentMeasurements() {
+    final List<String> measurementIds = new 
ArrayList<>(currentMeasurements.size());
+    for (final IMeasurementSchema schema : currentMeasurements) {
+      measurementIds.add(schema.getMeasurementName());
+    }
+    return measurementIds;
+  }
+
   private Tablet getNextTablet() {
     try {
       Tablet tablet = null;
@@ -354,7 +373,11 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
 
       // Switch chunk reader iff current chunk is all consumed
       if (!data.hasCurrent()) {
-        prepareData();
+        try {
+          prepareData();
+        } catch (final Exception e) {
+          deferredException = e;
+        }
       }
       PipeTabletUtils.compactBitMaps(tablet);
       return tablet;
@@ -364,6 +387,26 @@ public class TsFileInsertionEventScanParser extends 
TsFileInsertionEventParser {
     }
   }
 
+  private void throwIfDeferredException() {
+    if (Objects.isNull(deferredException)) {
+      return;
+    }
+
+    final Exception exception = deferredException;
+    deferredException = null;
+    throw new PipeException("Failed to prepare next tablet insertion event.", 
exception);
+  }
+
+  private boolean isLastTabletWithoutDeferredException() {
+    return Objects.isNull(deferredException) && Objects.isNull(chunkReader);
+  }
+
+  private void recordParseEndTimeIfNecessary() {
+    if (parseStartTimeRecorded && !parseEndTimeRecorded) {
+      recordParseEndTime();
+    }
+  }
+
   private void prepareData() throws IOException, IllegalPathException {
     do {
       do {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
index e7afde176b6..9945367a3dc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitor.java
@@ -20,10 +20,8 @@
 package org.apache.iotdb.db.storageengine.load.converter;
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
-import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.i18n.StorageEngineMessages;
-import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
 import 
org.apache.iotdb.db.pipe.sink.payload.evolvable.request.PipeTransferTabletRawReq;
 import org.apache.iotdb.db.queryengine.plan.statement.Statement;
 import org.apache.iotdb.db.queryengine.plan.statement.StatementNode;
@@ -90,16 +88,9 @@ public class LoadTreeStatementDataTypeConvertExecutionVisitor
 
     try {
       for (final File file : loadTsFileStatement.getTsFiles()) {
-        try (final TsFileInsertionEventScanParser parser =
-            new TsFileInsertionEventScanParser(
-                file,
-                new IoTDBTreePattern(null),
-                Long.MIN_VALUE,
-                Long.MAX_VALUE,
-                null,
-                null,
-                true)) {
-          for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
parser.toTabletWithIsAligneds()) {
+        try (final LoadTreeTsFileTabletIterator tabletIterator =
+            new LoadTreeTsFileTabletIterator(file, true)) {
+          for (final Pair<Tablet, Boolean> tabletWithIsAligned : 
tabletIterator) {
             final PipeTransferTabletRawReq tabletRawReq =
                 PipeTransferTabletRawReq.toTPipeTransferRawReq(
                     tabletWithIsAligned.getLeft(), 
tabletWithIsAligned.getRight());
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java
new file mode 100644
index 00000000000..20e4769b413
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeTsFileTabletIterator.java
@@ -0,0 +1,557 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import 
org.apache.iotdb.commons.exception.pipe.PipeRuntimeOutOfMemoryCriticalException;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
+import 
org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.query.TsFileInsertionEventQueryParser;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
+
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.TimeseriesMetadata;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.utils.Pair;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Load uses scan parsing first for throughput. If scan parsing hits 
corruption, fall back to query
+ * parsing for the remaining measurements and devices so later data can still 
be loaded.
+ */
+class LoadTreeTsFileTabletIterator
+    implements Iterable<Pair<Tablet, Boolean>>, Iterator<Pair<Tablet, 
Boolean>>, AutoCloseable {
+
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(LoadTreeTsFileTabletIterator.class);
+
+  private static final TreePattern LOAD_TREE_PATTERN = new 
IoTDBTreePattern(null);
+
+  private final File file;
+  private final boolean isWithMod;
+  private final ArrayDeque<QueryTask> pendingQueryTasks = new ArrayDeque<>();
+
+  private TsFileInsertionEventScanParser scanParser;
+  private QueryTask activeQueryTask;
+  private TsFileInsertionEventQueryParser activeQueryParser;
+  private Iterator<Pair<Tablet, Boolean>> activeIterator;
+  private boolean scanInitialized;
+  private boolean fallbackTriggered;
+
+  private IDeviceID lastEmittedDevice;
+  private List<String> lastEmittedMeasurements = Collections.emptyList();
+  private long lastEmittedTimestamp = Long.MIN_VALUE;
+
+  private IDeviceID lastScanTabletDevice;
+  private List<String> lastScanTabletMeasurements = Collections.emptyList();
+  private final Map<IDeviceID, Set<String>> fullyEmittedMeasurementsByDevice =
+      new LinkedHashMap<>();
+
+  LoadTreeTsFileTabletIterator(final File file, final boolean isWithMod) {
+    this.file = file;
+    this.isWithMod = isWithMod;
+  }
+
+  @Override
+  public Iterator<Pair<Tablet, Boolean>> iterator() {
+    return this;
+  }
+
+  @Override
+  public boolean hasNext() {
+    while (true) {
+      try {
+        ensureActiveIterator();
+        if (Objects.isNull(activeIterator)) {
+          close();
+          return false;
+        }
+
+        if (activeIterator.hasNext()) {
+          return true;
+        }
+
+        if (!switchToNextIterator()) {
+          close();
+          return false;
+        }
+      } catch (final Exception e) {
+        if (recoverFromIteratorFailure(e)) {
+          continue;
+        }
+        close();
+        throw toRuntimeException(e);
+      }
+    }
+  }
+
+  @Override
+  public Pair<Tablet, Boolean> next() {
+    while (true) {
+      if (!hasNext()) {
+        close();
+        throw new NoSuchElementException();
+      }
+
+      try {
+        final Pair<Tablet, Boolean> next = activeIterator.next();
+        recordProgress(next);
+        return next;
+      } catch (final Exception e) {
+        if (recoverFromIteratorFailure(e)) {
+          continue;
+        }
+        close();
+        throw toRuntimeException(e);
+      }
+    }
+  }
+
+  private void ensureActiveIterator() throws Exception {
+    if (Objects.nonNull(activeIterator)) {
+      return;
+    }
+
+    if (!scanInitialized && !fallbackTriggered) {
+      scanInitialized = true;
+      try {
+        scanParser =
+            new TsFileInsertionEventScanParser(
+                file, LOAD_TREE_PATTERN, Long.MIN_VALUE, Long.MAX_VALUE, null, 
null, isWithMod);
+        activeIterator = scanParser.toTabletWithIsAligneds().iterator();
+        return;
+      } catch (final Exception e) {
+        if (!switchFromScanToQuery(e)) {
+          throw toRuntimeException(e);
+        }
+      }
+    }
+
+    activateNextQueryParser();
+  }
+
+  private boolean switchToNextIterator() {
+    if (Objects.nonNull(activeQueryParser)) {
+      closeActiveQueryParser();
+      return activateNextQueryParser();
+    }
+
+    closeScanParser();
+    return activateNextQueryParser();
+  }
+
+  private boolean recoverFromIteratorFailure(final Exception e) {
+    if (shouldRethrow(e)) {
+      return false;
+    }
+
+    if (Objects.nonNull(activeQueryTask)) {
+      LOGGER.warn(
+          "Load: Query fallback failed for device {} measurements {} in TsFile 
{}. "
+              + "Split or skip this query task and continue.",
+          activeQueryTask.device,
+          activeQueryTask.measurements,
+          file.getAbsolutePath(),
+          e);
+      splitOrSkipActiveQueryTask();
+      return true;
+    }
+
+    return switchFromScanToQuery(e);
+  }
+
+  private boolean switchFromScanToQuery(final Exception e) {
+    if (fallbackTriggered) {
+      return false;
+    }
+
+    fallbackTriggered = true;
+    final IDeviceID currentDevice =
+        Objects.nonNull(scanParser) ? scanParser.getCurrentDevice() : null;
+    final List<String> currentMeasurements =
+        Objects.nonNull(scanParser) ? scanParser.getCurrentMeasurements() : 
Collections.emptyList();
+
+    markLastScanMeasurementsAsCompletedIfNeeded(currentDevice, 
currentMeasurements);
+
+    closeScanParser();
+
+    try {
+      pendingQueryTasks.addAll(buildQueryTasks(currentDevice, 
currentMeasurements));
+    } catch (final Exception queryInitException) {
+      LOGGER.warn(
+          "Load: Failed to initialize query fallback for TsFile {} after scan 
parser failure.",
+          file.getAbsolutePath(),
+          queryInitException);
+      return false;
+    }
+
+    LOGGER.warn(
+        "Load: Scan parser detected a corrupted section in TsFile {} at device 
{}. "
+            + "Switch to query parsing for remaining devices.",
+        file.getAbsolutePath(),
+        currentDevice,
+        e);
+    return true;
+  }
+
+  private ArrayDeque<QueryTask> buildQueryTasks(
+      final IDeviceID currentDevice, final List<String> currentMeasurements) 
throws IOException {
+    final LinkedHashMap<IDeviceID, List<String>> deviceMeasurementsMap =
+        readDeviceMeasurementsInOrder();
+    if (deviceMeasurementsMap.isEmpty()) {
+      return new ArrayDeque<>();
+    }
+
+    final ArrayDeque<QueryTask> tasks = new ArrayDeque<>();
+    boolean includeCurrentAndFollowingDevices =
+        Objects.isNull(currentDevice) || 
!deviceMeasurementsMap.containsKey(currentDevice);
+
+    for (final Map.Entry<IDeviceID, List<String>> entry : 
deviceMeasurementsMap.entrySet()) {
+      final IDeviceID device = entry.getKey();
+      if (!includeCurrentAndFollowingDevices && device.equals(currentDevice)) {
+        includeCurrentAndFollowingDevices = true;
+      }
+      if (!includeCurrentAndFollowingDevices) {
+        continue;
+      }
+
+      if (device.equals(currentDevice)) {
+        addCurrentDeviceQueryTasks(tasks, device, entry.getValue(), 
currentMeasurements);
+      } else {
+        addQueryTaskIfNecessary(tasks, device, entry.getValue(), 
Long.MIN_VALUE, Long.MAX_VALUE);
+      }
+    }
+
+    return tasks;
+  }
+
+  private LinkedHashMap<IDeviceID, List<String>> 
readDeviceMeasurementsInOrder()
+      throws IOException {
+    final LinkedHashMap<IDeviceID, List<String>> deviceMeasurementsMap = new 
LinkedHashMap<>();
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath())) {
+      final Iterator<Pair<IDeviceID, List<TimeseriesMetadata>>> 
metadataIterator =
+          reader.iterAllTimeseriesMetadata(false, false);
+      while (metadataIterator.hasNext()) {
+        final Pair<IDeviceID, List<TimeseriesMetadata>> deviceMetadata = 
metadataIterator.next();
+        deviceMeasurementsMap.put(
+            deviceMetadata.getLeft(),
+            deviceMetadata.getRight().stream()
+                .map(TimeseriesMetadata::getMeasurementId)
+                .collect(Collectors.toList()));
+      }
+    }
+    return deviceMeasurementsMap;
+  }
+
+  private void addCurrentDeviceQueryTasks(
+      final ArrayDeque<QueryTask> tasks,
+      final IDeviceID device,
+      final List<String> allMeasurements,
+      final List<String> currentMeasurements) {
+    final Set<String> completedMeasurements =
+        fullyEmittedMeasurementsByDevice.getOrDefault(device, 
Collections.emptySet());
+    final Set<String> currentMeasurementSet = new 
LinkedHashSet<>(currentMeasurements);
+
+    final List<String> currentMeasurementsToResume = new ArrayList<>();
+    final List<String> remainingMeasurements = new ArrayList<>();
+    for (final String measurement : allMeasurements) {
+      if (completedMeasurements.contains(measurement)) {
+        continue;
+      }
+      if (currentMeasurementSet.contains(measurement)) {
+        currentMeasurementsToResume.add(measurement);
+      } else {
+        remainingMeasurements.add(measurement);
+      }
+    }
+
+    addQueryTaskIfNecessary(
+        tasks,
+        device,
+        currentMeasurementsToResume,
+        determineTaskResumeStartTime(device, currentMeasurementsToResume, 
Long.MIN_VALUE),
+        Long.MAX_VALUE);
+    addQueryTaskIfNecessary(tasks, device, remainingMeasurements, 
Long.MIN_VALUE, Long.MAX_VALUE);
+  }
+
+  private boolean activateNextQueryParser() {
+    closeActiveQueryParser();
+
+    while (!pendingQueryTasks.isEmpty()) {
+      activeQueryTask = pendingQueryTasks.removeFirst();
+      try {
+        activeQueryParser =
+            new TsFileInsertionEventQueryParser(
+                file,
+                LOAD_TREE_PATTERN,
+                activeQueryTask.startTime,
+                activeQueryTask.endTime,
+                activeQueryTask.toDeviceMeasurementsMap(),
+                isWithMod);
+        final Iterator<TabletInsertionEvent> tabletIterator =
+            activeQueryParser.toTabletInsertionEvents().iterator();
+        activeIterator =
+            new Iterator<Pair<Tablet, Boolean>>() {
+              @Override
+              public boolean hasNext() {
+                return tabletIterator.hasNext();
+              }
+
+              @Override
+              public Pair<Tablet, Boolean> next() {
+                final TabletInsertionEvent event = tabletIterator.next();
+                if (!(event instanceof PipeRawTabletInsertionEvent)) {
+                  throw new IllegalStateException(
+                      "Expected PipeRawTabletInsertionEvent but got " + 
event.getClass().getName());
+                }
+
+                final PipeRawTabletInsertionEvent rawTabletInsertionEvent =
+                    (PipeRawTabletInsertionEvent) event;
+                return new Pair<>(
+                    rawTabletInsertionEvent.convertToTablet(), 
rawTabletInsertionEvent.isAligned());
+              }
+            };
+        return true;
+      } catch (final Exception e) {
+        LOGGER.warn(
+            "Load: Failed to initialize query fallback for device {} 
measurements {} in TsFile {}. "
+                + "Split or skip this query task and continue.",
+            activeQueryTask.device,
+            activeQueryTask.measurements,
+            file.getAbsolutePath(),
+            e);
+        splitOrSkipActiveQueryTask();
+      }
+    }
+
+    activeIterator = null;
+    return false;
+  }
+
+  private void recordProgress(final Pair<Tablet, Boolean> tabletWithIsAligned) 
{
+    final Tablet tablet = tabletWithIsAligned.getLeft();
+    if (Objects.isNull(tablet) || tablet.getRowSize() == 0) {
+      return;
+    }
+
+    final IDeviceID device = 
IDeviceID.Factory.DEFAULT_FACTORY.create(tablet.getDeviceId());
+    final List<String> measurements = extractMeasurementNames(tablet);
+
+    if (Objects.isNull(activeQueryParser)) {
+      recordScanProgress(device, measurements);
+    }
+
+    lastEmittedDevice = device;
+    lastEmittedMeasurements = measurements;
+    lastEmittedTimestamp = tablet.getTimestamp(tablet.getRowSize() - 1);
+  }
+
+  private boolean shouldRethrow(final Exception e) {
+    Throwable current = e;
+    while (Objects.nonNull(current)) {
+      if (current instanceof InterruptedException
+          || current instanceof PipeRuntimeOutOfMemoryCriticalException) {
+        return true;
+      }
+      current = current.getCause();
+    }
+    return false;
+  }
+
+  private RuntimeException toRuntimeException(final Exception e) {
+    return e instanceof RuntimeException
+        ? (RuntimeException) e
+        : new IllegalStateException("Failed to iterate tablets while loading 
TsFile.", e);
+  }
+
+  private void closeScanParser() {
+    activeIterator = null;
+    if (Objects.nonNull(scanParser)) {
+      scanParser.close();
+      scanParser = null;
+    }
+  }
+
+  private void closeActiveQueryParser() {
+    activeIterator = null;
+    activeQueryTask = null;
+    if (Objects.isNull(activeQueryParser)) {
+      return;
+    }
+
+    activeQueryParser.close();
+    activeQueryParser = null;
+  }
+
+  @Override
+  public void close() {
+    activeIterator = null;
+    closeScanParser();
+    closeActiveQueryParser();
+    pendingQueryTasks.clear();
+  }
+
+  private void recordScanProgress(final IDeviceID device, final List<String> 
measurements) {
+    if (Objects.nonNull(lastScanTabletDevice)
+        && (!lastScanTabletDevice.equals(device)
+            || !measurementsEqual(lastScanTabletMeasurements, measurements))) {
+      markMeasurementsFullyEmitted(lastScanTabletDevice, 
lastScanTabletMeasurements);
+    }
+
+    lastScanTabletDevice = device;
+    lastScanTabletMeasurements = measurements;
+  }
+
+  private void markLastScanMeasurementsAsCompletedIfNeeded(
+      final IDeviceID currentDevice, final List<String> currentMeasurements) {
+    if (Objects.isNull(lastScanTabletDevice) || 
lastScanTabletMeasurements.isEmpty()) {
+      return;
+    }
+
+    if (!lastScanTabletDevice.equals(currentDevice)
+        || !currentMeasurements.isEmpty()
+            && !measurementsEqual(lastScanTabletMeasurements, 
currentMeasurements)) {
+      markMeasurementsFullyEmitted(lastScanTabletDevice, 
lastScanTabletMeasurements);
+    }
+  }
+
+  private void markMeasurementsFullyEmitted(
+      final IDeviceID device, final List<String> measurements) {
+    if (Objects.isNull(device) || measurements.isEmpty()) {
+      return;
+    }
+
+    fullyEmittedMeasurementsByDevice
+        .computeIfAbsent(device, key -> new LinkedHashSet<>())
+        .addAll(measurements);
+  }
+
+  private long determineTaskResumeStartTime(
+      final IDeviceID device, final List<String> measurements, final long 
defaultStartTime) {
+    if (measurements.isEmpty()
+        || !device.equals(lastEmittedDevice)
+        || lastEmittedTimestamp == Long.MIN_VALUE
+        || !measurementsEqual(measurements, lastEmittedMeasurements)) {
+      return defaultStartTime;
+    }
+
+    return lastEmittedTimestamp == Long.MAX_VALUE ? Long.MAX_VALUE : 
lastEmittedTimestamp + 1;
+  }
+
+  private void addQueryTaskIfNecessary(
+      final ArrayDeque<QueryTask> tasks,
+      final IDeviceID device,
+      final List<String> measurements,
+      final long startTime,
+      final long endTime) {
+    if (measurements.isEmpty() || startTime == Long.MAX_VALUE) {
+      return;
+    }
+
+    tasks.addLast(new QueryTask(device, measurements, startTime, endTime));
+  }
+
+  private void splitOrSkipActiveQueryTask() {
+    final QueryTask failedTask = activeQueryTask;
+    closeActiveQueryParser();
+    if (Objects.isNull(failedTask)) {
+      return;
+    }
+
+    if (failedTask.measurements.size() <= 1) {
+      return;
+    }
+
+    final long resumeStartTime =
+        determineTaskResumeStartTime(
+            failedTask.device, failedTask.measurements, failedTask.startTime);
+    final List<QueryTask> splitTasks = failedTask.split(resumeStartTime);
+    for (int i = splitTasks.size() - 1; i >= 0; --i) {
+      pendingQueryTasks.addFirst(splitTasks.get(i));
+    }
+  }
+
+  private List<String> extractMeasurementNames(final Tablet tablet) {
+    return IMeasurementSchema.getMeasurementNameList(tablet.getSchemas());
+  }
+
+  private boolean measurementsEqual(
+      final List<String> leftMeasurements, final List<String> 
rightMeasurements) {
+    return leftMeasurements.size() == rightMeasurements.size()
+        && new LinkedHashSet<>(leftMeasurements).equals(new 
LinkedHashSet<>(rightMeasurements));
+  }
+
+  private static class QueryTask {
+    private final IDeviceID device;
+    private final List<String> measurements;
+    private final long startTime;
+    private final long endTime;
+
+    private QueryTask(
+        final IDeviceID device,
+        final List<String> measurements,
+        final long startTime,
+        final long endTime) {
+      this.device = device;
+      this.measurements = new ArrayList<>(measurements);
+      this.startTime = startTime;
+      this.endTime = endTime;
+    }
+
+    private LinkedHashMap<IDeviceID, List<String>> toDeviceMeasurementsMap() {
+      final LinkedHashMap<IDeviceID, List<String>> deviceMeasurementsMap = new 
LinkedHashMap<>();
+      deviceMeasurementsMap.put(device, measurements);
+      return deviceMeasurementsMap;
+    }
+
+    private List<QueryTask> split(final long resumeStartTime) {
+      final int middle = measurements.size() / 2;
+      if (middle <= 0) {
+        return Collections.emptyList();
+      }
+
+      final List<QueryTask> splitTasks = new ArrayList<>(2);
+      splitTasks.add(
+          new QueryTask(device, measurements.subList(0, middle), 
resumeStartTime, endTime));
+      splitTasks.add(
+          new QueryTask(
+              device, measurements.subList(middle, measurements.size()), 
resumeStartTime, endTime));
+      return splitTasks;
+    }
+  }
+}
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
new file mode 100644
index 00000000000..32b15a1859d
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/storageengine/load/converter/LoadTreeStatementDataTypeConvertExecutionVisitorTest.java
@@ -0,0 +1,380 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.storageengine.load.converter;
+
+import org.apache.iotdb.common.rpc.thrift.TSStatus;
+import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.path.PartialPath;
+import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
+import 
org.apache.iotdb.db.pipe.event.common.tsfile.parser.scan.TsFileInsertionEventScanParser;
+import org.apache.iotdb.db.queryengine.plan.statement.Statement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertMultiTabletsStatement;
+import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
+import org.apache.iotdb.db.queryengine.plan.statement.crud.LoadTsFileStatement;
+import org.apache.iotdb.rpc.TSStatusCode;
+
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.file.metadata.AbstractAlignedChunkMetadata;
+import org.apache.tsfile.file.metadata.IChunkMetadata;
+import org.apache.tsfile.file.metadata.IDeviceID;
+import org.apache.tsfile.file.metadata.enums.TSEncoding;
+import org.apache.tsfile.read.TsFileSequenceReader;
+import org.apache.tsfile.read.common.Path;
+import org.apache.tsfile.write.TsFileWriter;
+import org.apache.tsfile.write.record.Tablet;
+import org.apache.tsfile.write.schema.IMeasurementSchema;
+import org.apache.tsfile.write.schema.MeasurementSchema;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.File;
+import java.io.RandomAccessFile;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+
+public class LoadTreeStatementDataTypeConvertExecutionVisitorTest {
+
+  private static final String DEVICE_0 = "root.sg.d0";
+  private static final String DEVICE_1 = "root.sg.d1";
+  private static final String DEVICE_2 = "root.sg.d2";
+  private static final String ALIGNED_DEVICE = "root.sg.ad0";
+  private static final int ROW_COUNT_PER_DEVICE = 2048;
+  private File tsFile;
+  private boolean isPipeMemoryManagementEnabled;
+  private long pipeMaxReaderChunkSize;
+
+  @Before
+  public void setUp() {
+    isPipeMemoryManagementEnabled = 
PipeConfigAccessor.getPipeMemoryManagementEnabled();
+    PipeConfigAccessor.setPipeMemoryManagementEnabled(false);
+    pipeMaxReaderChunkSize = 
CommonDescriptor.getInstance().getConfig().getPipeMaxReaderChunkSize();
+  }
+
+  @After
+  public void tearDown() {
+    
PipeConfigAccessor.setPipeMemoryManagementEnabled(isPipeMemoryManagementEnabled);
+    
CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(pipeMaxReaderChunkSize);
+    if (tsFile != null && tsFile.exists()) {
+      Assert.assertTrue(tsFile.delete());
+    }
+  }
+
+  @Test
+  public void 
testFallbackToQueryForRemainingDevicesWhenScanParserHitsCorruption()
+      throws Exception {
+    tsFile = new File("load-tree-query-fallback-corrupted.tsfile");
+    writeTsFile(tsFile);
+    corruptMeasurementChunk(tsFile, DEVICE_1, "s0");
+
+    Assert.assertTrue("Expected scan parser to fail after corruption.", 
scanParserFails(tsFile));
+
+    final Map<String, Integer> pointCountByDevice = new HashMap<>();
+    final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+        new LoadTreeStatementDataTypeConvertExecutionVisitor(
+            statement -> {
+              collectLoadedPoints(statement, pointCountByDevice);
+              return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            });
+
+    final Optional<TSStatus> status =
+        
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
 null);
+
+    Assert.assertTrue(status.isPresent());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.get().getCode());
+    final int loadedPointCountBeforeCorruption = 
pointCountByDevice.getOrDefault(DEVICE_0, 0);
+    final int loadedPointCountAfterFallback = 
pointCountByDevice.getOrDefault(DEVICE_2, 0);
+    Assert.assertTrue(loadedPointCountBeforeCorruption > 0);
+    Assert.assertEquals(loadedPointCountBeforeCorruption, 
loadedPointCountAfterFallback);
+  }
+
+  @Test
+  public void testFallbackToQueryWhenFirstNonAlignedDeviceIsCorrupted() throws 
Exception {
+    tsFile = new 
File("load-tree-query-fallback-corrupted-first-non-aligned-device.tsfile");
+    writeTsFile(tsFile);
+    corruptMeasurementChunk(tsFile, DEVICE_0, "s0");
+
+    Assert.assertTrue("Expected scan parser to fail after corruption.", 
scanParserFails(tsFile));
+
+    final Map<String, Integer> pointCountByTimeseries = new HashMap<>();
+    final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+        new LoadTreeStatementDataTypeConvertExecutionVisitor(
+            statement -> {
+              collectLoadedPointsByTimeseries(statement, 
pointCountByTimeseries);
+              return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            });
+
+    final Optional<TSStatus> status =
+        
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
 null);
+
+    Assert.assertTrue(status.isPresent());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.get().getCode());
+
+    Assert.assertTrue(
+        pointCountByTimeseries.getOrDefault(DEVICE_0 + ".s0", 0) < 
ROW_COUNT_PER_DEVICE);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_0, 1);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 1);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1);
+  }
+
+  @Test
+  public void 
testFallbackDoesNotReloadCompletedMeasurementsOfCurrentNonAlignedDevice()
+      throws Exception {
+    tsFile = new 
File("load-tree-query-fallback-corrupted-current-non-aligned-device.tsfile");
+    writeTsFile(tsFile);
+    corruptMeasurementChunk(tsFile, DEVICE_1, "s1");
+
+    Assert.assertTrue("Expected scan parser to fail after corruption.", 
scanParserFails(tsFile));
+
+    final Map<String, Integer> pointCountByTimeseries = new HashMap<>();
+    final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+        new LoadTreeStatementDataTypeConvertExecutionVisitor(
+            statement -> {
+              collectLoadedPointsByTimeseries(statement, 
pointCountByTimeseries);
+              return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            });
+
+    final Optional<TSStatus> status =
+        
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
 null);
+
+    Assert.assertTrue(status.isPresent());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.get().getCode());
+
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_1, 0);
+    Assert.assertTrue(
+        pointCountByTimeseries.getOrDefault(DEVICE_1 + ".s1", 0) < 
ROW_COUNT_PER_DEVICE);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 0);
+    assertMeasurementLoadedCompletely(pointCountByTimeseries, DEVICE_2, 1);
+  }
+
+  @Test
+  public void 
testFallbackToQueryForRemainingMeasurementsOfCurrentAlignedDevice() throws 
Exception {
+    CommonDescriptor.getInstance().getConfig().setPipeMaxReaderChunkSize(0);
+
+    tsFile = new 
File("load-tree-query-fallback-corrupted-aligned-current-device.tsfile");
+    writeWideAlignedTsFile(tsFile, ALIGNED_DEVICE, 16);
+    corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s8");
+    corruptMeasurementChunk(tsFile, ALIGNED_DEVICE, "s12");
+
+    Assert.assertTrue("Expected scan parser to fail after corruption.", 
scanParserFails(tsFile));
+
+    final Map<String, Integer> pointCountByTimeseries = new HashMap<>();
+    final LoadTreeStatementDataTypeConvertExecutionVisitor visitor =
+        new LoadTreeStatementDataTypeConvertExecutionVisitor(
+            statement -> {
+              collectLoadedPointsByTimeseries(statement, 
pointCountByTimeseries);
+              return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+            });
+
+    final Optional<TSStatus> status =
+        
visitor.visitLoadFile(LoadTsFileStatement.createUnchecked(tsFile.getAbsolutePath()),
 null);
+
+    Assert.assertTrue(status.isPresent());
+    Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.get().getCode());
+
+    for (int measurementIndex = 0; measurementIndex < 8; ++measurementIndex) {
+      assertMeasurementLoadedCompletely(pointCountByTimeseries, 
ALIGNED_DEVICE, measurementIndex);
+    }
+    for (int measurementIndex : Arrays.asList(9, 10, 11, 13, 14, 15)) {
+      assertMeasurementLoadedCompletely(pointCountByTimeseries, 
ALIGNED_DEVICE, measurementIndex);
+    }
+
+    Assert.assertTrue(
+        pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s8", 0) < 
ROW_COUNT_PER_DEVICE);
+    Assert.assertTrue(
+        pointCountByTimeseries.getOrDefault(ALIGNED_DEVICE + ".s12", 0) < 
ROW_COUNT_PER_DEVICE);
+  }
+
+  private void writeTsFile(final File file) throws Exception {
+    if (file.exists()) {
+      Assert.assertTrue(file.delete());
+    }
+
+    final List<IMeasurementSchema> schemaList =
+        Arrays.asList(
+            new MeasurementSchema("s0", TSDataType.INT64, TSEncoding.PLAIN),
+            new MeasurementSchema("s1", TSDataType.INT64, TSEncoding.PLAIN));
+
+    try (final TsFileWriter writer = new TsFileWriter(file)) {
+      writeDevice(writer, schemaList, DEVICE_0, 0);
+      writeDevice(writer, schemaList, DEVICE_1, 10_000);
+      writeDevice(writer, schemaList, DEVICE_2, 20_000);
+    }
+  }
+
+  private void writeWideAlignedTsFile(
+      final File file, final String device, final int measurementCount) throws 
Exception {
+    if (file.exists()) {
+      Assert.assertTrue(file.delete());
+    }
+
+    final List<IMeasurementSchema> schemaList = new java.util.ArrayList<>();
+    for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+      schemaList.add(
+          new MeasurementSchema("s" + measurementIndex, TSDataType.INT64, 
TSEncoding.PLAIN));
+    }
+
+    try (final TsFileWriter writer = new TsFileWriter(file)) {
+      writer.registerAlignedTimeseries(new PartialPath(device), schemaList);
+
+      final Tablet tablet = new Tablet(device, schemaList, 
ROW_COUNT_PER_DEVICE);
+      for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) {
+        tablet.addTimestamp(row, row);
+        for (int measurementIndex = 0; measurementIndex < measurementCount; 
++measurementIndex) {
+          tablet.addValue("s" + measurementIndex, row, (long) measurementIndex 
* 10_000 + row);
+        }
+      }
+      writer.writeAligned(tablet);
+    }
+  }
+
+  private void writeDevice(
+      final TsFileWriter writer,
+      final List<IMeasurementSchema> schemaList,
+      final String device,
+      final long valueBase)
+      throws Exception {
+    writer.registerTimeseries(new Path(device), schemaList);
+
+    final Tablet tablet = new Tablet(device, schemaList, ROW_COUNT_PER_DEVICE);
+    for (int row = 0; row < ROW_COUNT_PER_DEVICE; ++row) {
+      tablet.addTimestamp(row, row);
+      tablet.addValue("s0", row, valueBase + row);
+      tablet.addValue("s1", row, valueBase + ROW_COUNT_PER_DEVICE + row);
+    }
+    writer.writeTree(tablet);
+  }
+
+  private void corruptMeasurementChunk(
+      final File file, final String device, final String measurement) throws 
Exception {
+    try (final TsFileSequenceReader reader = new 
TsFileSequenceReader(file.getAbsolutePath())) {
+      final IDeviceID deviceId = 
IDeviceID.Factory.DEFAULT_FACTORY.create(device);
+      final List<IChunkMetadata> chunkMetadataList =
+          reader.getIChunkMetadataList(deviceId, measurement);
+      Assert.assertFalse(chunkMetadataList.isEmpty());
+
+      final long chunkHeaderOffset =
+          getTargetChunkMetadata(chunkMetadataList.get(0), 
measurement).getOffsetOfChunkHeader();
+      try (final RandomAccessFile randomAccessFile = new 
RandomAccessFile(file, "rw")) {
+        randomAccessFile.seek(chunkHeaderOffset + 64);
+        randomAccessFile.write(new byte[] {1, 2, 3, 4, 5, 6, 7, 8});
+      }
+    }
+  }
+
+  private IChunkMetadata getTargetChunkMetadata(
+      final IChunkMetadata chunkMetadata, final String measurement) {
+    if (!(chunkMetadata instanceof AbstractAlignedChunkMetadata)) {
+      return chunkMetadata;
+    }
+
+    final IChunkMetadata valueChunkMetadata =
+        ((AbstractAlignedChunkMetadata) chunkMetadata)
+            .getValueChunkMetadataList().stream()
+                .filter(Objects::nonNull)
+                .filter(metadata -> 
measurement.equals(metadata.getMeasurementUid()))
+                .findFirst()
+                .orElse(null);
+    Assert.assertNotNull(valueChunkMetadata);
+    return valueChunkMetadata;
+  }
+
+  private void assertMeasurementLoadedCompletely(
+      final Map<String, Integer> pointCountByTimeseries,
+      final String device,
+      final int measurementIndex) {
+    Assert.assertEquals(
+        ROW_COUNT_PER_DEVICE,
+        pointCountByTimeseries.getOrDefault(device + ".s" + measurementIndex, 
0).intValue());
+  }
+
+  private boolean scanParserFails(final File file) throws Exception {
+    try (final TsFileInsertionEventScanParser parser =
+        new TsFileInsertionEventScanParser(
+            file, new IoTDBTreePattern(null), Long.MIN_VALUE, Long.MAX_VALUE, 
null, null, true)) {
+      parser.toTabletWithIsAligneds().forEach(tabletWithIsAligned -> {});
+      return false;
+    } catch (final Exception e) {
+      return true;
+    }
+  }
+
+  private void collectLoadedPointsByTimeseries(
+      final Statement statement, final Map<String, Integer> 
pointCountByTimeseries) {
+    Assert.assertTrue(statement instanceof InsertMultiTabletsStatement);
+    for (final InsertTabletStatement insertTabletStatement :
+        ((InsertMultiTabletsStatement) 
statement).getInsertTabletStatementList()) {
+      for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) {
+        for (int column = 0; column < 
insertTabletStatement.getMeasurements().length; ++column) {
+          final String measurement = 
insertTabletStatement.getMeasurements()[column];
+          if (measurement == null || insertTabletStatement.isNull(row, 
column)) {
+            continue;
+          }
+          pointCountByTimeseries.merge(
+              insertTabletStatement.getDevicePath().getFullPath() + "." + 
measurement,
+              1,
+              Integer::sum);
+        }
+      }
+    }
+  }
+
+  private void collectLoadedPoints(
+      final Statement statement, final Map<String, Integer> 
pointCountByDevice) {
+    Assert.assertTrue(statement instanceof InsertMultiTabletsStatement);
+    for (final InsertTabletStatement insertTabletStatement :
+        ((InsertMultiTabletsStatement) 
statement).getInsertTabletStatementList()) {
+      pointCountByDevice.merge(
+          insertTabletStatement.getDevicePath().getFullPath(),
+          countNonNullPoints(insertTabletStatement),
+          Integer::sum);
+    }
+  }
+
+  private int countNonNullPoints(final InsertTabletStatement 
insertTabletStatement) {
+    int pointCount = 0;
+    for (int row = 0; row < insertTabletStatement.getRowCount(); ++row) {
+      for (int column = 0; column < 
insertTabletStatement.getMeasurements().length; ++column) {
+        if (insertTabletStatement.getMeasurements()[column] != null
+            && !insertTabletStatement.isNull(row, column)) {
+          ++pointCount;
+        }
+      }
+    }
+    return pointCount;
+  }
+
+  private static class PipeConfigAccessor {
+    private static boolean getPipeMemoryManagementEnabled() {
+      return 
CommonDescriptor.getInstance().getConfig().getPipeMemoryManagementEnabled();
+    }
+
+    private static void setPipeMemoryManagementEnabled(final boolean enabled) {
+      
CommonDescriptor.getInstance().getConfig().setPipeMemoryManagementEnabled(enabled);
+    }
+  }
+}

Reply via email to