This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 230cd633172 [to dev/1.3] fix: prevent NPE when isFinished() is called 
before DataDriver init (#17441)
230cd633172 is described below

commit 230cd6331723a259392dd1f143cf9084a0b9eed9
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 8 14:11:59 2026 +0800

    [to dev/1.3] fix: prevent NPE when isFinished() is called before DataDriver 
init (#17441)
---
 .../queryengine/execution/driver/DataDriver.java   |   5 +
 .../db/queryengine/execution/driver/Driver.java    |   4 +-
 .../queryengine/execution/driver/SchemaDriver.java |   5 +
 .../db/queryengine/execution/DataDriverTest.java   | 127 ++++++++++++++++++++-
 4 files changed, 138 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index 743a05d3777..c9c41f5b993 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -74,6 +74,11 @@ public class DataDriver extends Driver {
     return true;
   }
 
+  @Override
+  public boolean isInit() {
+    return init;
+  }
+
   /**
    * Init seq file list and unseq file list in {@link
    * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and 
set it into each
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index ac353e32213..35485dbbd4e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -213,7 +213,7 @@ public abstract class Driver implements IDriver {
       finished =
           state.get() != State.ALIVE
               || driverContext.isDone()
-              || root.isFinished()
+              || (isInit() && root.isFinished())
               || sink.isClosed();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -224,6 +224,8 @@ public abstract class Driver implements IDriver {
     return finished;
   }
 
+  abstract boolean isInit();
+
   @SuppressWarnings({"squid:S1181", "squid:S112"})
   private ListenableFuture<?> processInternal() {
     long startTimeNanos = System.nanoTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
index b507aa1c748..e3230b7ff7f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
@@ -42,6 +42,11 @@ public class SchemaDriver extends Driver {
     return true;
   }
 
+  @Override
+  boolean isInit() {
+    return true;
+  }
+
   @Override
   protected void releaseResource() {
     driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 1f092e4c97c..5176923f8c2 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -22,7 +22,9 @@ import 
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
 import org.apache.iotdb.commons.exception.IllegalPathException;
 import org.apache.iotdb.commons.exception.MetadataException;
 import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.WriteProcessException;
 import org.apache.iotdb.db.exception.query.QueryProcessException;
 import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
 import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
@@ -35,7 +37,9 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -51,7 +55,6 @@ import 
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
 import com.google.common.util.concurrent.ListenableFuture;
 import io.airlift.units.Duration;
 import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.write.WriteProcessException;
 import org.apache.tsfile.read.common.block.TsBlock;
 import org.apache.tsfile.read.common.block.column.IntColumn;
 import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -91,7 +94,11 @@ public class DataDriverTest {
           TimeUnit.MILLISECONDS);
 
   @Before
-  public void setUp() throws MetadataException, IOException, 
WriteProcessException {
+  public void setUp()
+      throws MetadataException,
+          IOException,
+          WriteProcessException,
+          org.apache.tsfile.exception.write.WriteProcessException {
     
IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000);
     SeriesReaderTestUtil.setUp(
         measurementSchemas, deviceIds, seqResources, unSeqResources, 
DATA_DRIVER_TEST_SG);
@@ -248,4 +255,120 @@ public class DataDriverTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  @Test
+  public void testCallIsFinishedBeforeDataSourcePrepared() throws 
IllegalPathException {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      MeasurementPath measurementPath1 =
+          new MeasurementPath(
+              new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
+              new MeasurementSchema("sensor0", TSDataType.INT32));
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      DataRegion dataRegion = Mockito.mock(DataRegion.class);
+      Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      fragmentInstanceContext.setDataRegion(dataRegion);
+      DataDriverContext driverContext = new 
DataDriverContext(fragmentInstanceContext, 0);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          3, new PlanNodeId("3"), 
FullOuterTimeJoinOperator.class.getSimpleName());
+      driverContext.addOperatorContext(4, new PlanNodeId("4"), 
LimitOperator.class.getSimpleName());
+
+      SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+      scanOptionsBuilder.withAllSensors(allSensors);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(0),
+              planNodeId1,
+              measurementPath1,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      driverContext.addSourceOperator(seriesScanOperator1);
+      driverContext.addPath(measurementPath1);
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      MeasurementPath measurementPath2 =
+          new MeasurementPath(
+              new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
+              new MeasurementSchema("sensor1", TSDataType.INT32));
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(1),
+              planNodeId2,
+              measurementPath2,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      driverContext.addSourceOperator(seriesScanOperator2);
+      driverContext.addPath(measurementPath2);
+
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      LeftOuterTimeJoinOperator timeJoinOperator =
+          new LeftOuterTimeJoinOperator(
+              driverContext.getOperatorContexts().get(2),
+              seriesScanOperator1,
+              1,
+              seriesScanOperator2,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              new AscTimeComparator());
+      SingleDeviceViewOperator fakeOperator =
+          new SingleDeviceViewOperator(
+              driverContext.getOperatorContexts().get(3),
+              "d1",
+              timeJoinOperator,
+              Arrays.asList(0),
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+      fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+
+      fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
+      String deviceId = DATA_DRIVER_TEST_SG + ".device0";
+      Mockito.when(
+              dataRegion.query(
+                  eq(driverContext.getPaths()),
+                  eq(deviceId),
+                  eq(fragmentInstanceContext),
+                  Mockito.isNull(),
+                  Mockito.isNull(),
+                  Mockito.anyLong()))
+          .thenReturn(null);
+      fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+      fragmentInstanceContext.initializeNumOfDrivers(1);
+
+      StubSink stubSink = new StubSink(fragmentInstanceContext);
+      driverContext.setSink(stubSink);
+      IDriver dataDriver = null;
+      try {
+        dataDriver = new DataDriver(fakeOperator, driverContext, 0);
+        assertEquals(
+            fragmentInstanceContext.getId(), 
dataDriver.getDriverTaskId().getFragmentInstanceId());
+        assertFalse(dataDriver.isFinished());
+      } finally {
+        if (dataDriver != null) {
+          dataDriver.close();
+        }
+      }
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }

Reply via email to