This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new cfce7d9089c fix: prevent NPE when isFinished() is called before 
DataDriver init (#17440)
cfce7d9089c is described below

commit cfce7d9089cc7bd0468d3cb0662136cea42adc5c
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 8 14:10:29 2026 +0800

    fix: prevent NPE when isFinished() is called before DataDriver init (#17440)
---
 .../queryengine/execution/driver/DataDriver.java   |   5 +
 .../db/queryengine/execution/driver/Driver.java    |   4 +-
 .../queryengine/execution/driver/SchemaDriver.java |   5 +
 .../db/queryengine/execution/DataDriverTest.java   | 118 +++++++++++++++++++++
 4 files changed, 131 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index 743a05d3777..c9c41f5b993 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -74,6 +74,11 @@ public class DataDriver extends Driver {
     return true;
   }
 
+  @Override
+  public boolean isInit() {
+    return init;
+  }
+
   /**
    * Init seq file list and unseq file list in {@link
    * org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and 
set it into each
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index 690830a9b38..ad5eba15229 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -214,7 +214,7 @@ public abstract class Driver implements IDriver {
       finished =
           state.get() != State.ALIVE
               || driverContext.isDone()
-              || root.isFinished()
+              || (isInit() && root.isFinished())
               || sink.isClosed();
     } catch (Exception e) {
       throw new RuntimeException(e);
@@ -225,6 +225,8 @@ public abstract class Driver implements IDriver {
     return finished;
   }
 
+  abstract boolean isInit();
+
   @SuppressWarnings({"squid:S1181", "squid:S112"})
   private ListenableFuture<?> processInternal() {
     long startTimeNanos = System.nanoTime();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
index b507aa1c748..e3230b7ff7f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
@@ -42,6 +42,11 @@ public class SchemaDriver extends Driver {
     return true;
   }
 
+  @Override
+  boolean isInit() {
+    return true;
+  }
+
   @Override
   protected void releaseResource() {
     driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 0af5098d4ba..c24fd3dac57 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -35,7 +35,9 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
 import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -254,4 +256,120 @@ public class DataDriverTest {
       instanceNotificationExecutor.shutdown();
     }
   }
+
+  @Test
+  public void testCallIsFinishedBeforeDataSourcePrepared() {
+    ExecutorService instanceNotificationExecutor =
+        IoTDBThreadPoolFactory.newFixedThreadPool(1, 
"test-instance-notification");
+    try {
+      IFullPath measurementPath1 =
+          new NonAlignedFullPath(
+              IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + 
".device0"),
+              new MeasurementSchema("sensor0", TSDataType.INT32));
+      Set<String> allSensors = new HashSet<>();
+      allSensors.add("sensor0");
+      allSensors.add("sensor1");
+      QueryId queryId = new QueryId("stub_query");
+      FragmentInstanceId instanceId =
+          new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+      FragmentInstanceStateMachine stateMachine =
+          new FragmentInstanceStateMachine(instanceId, 
instanceNotificationExecutor);
+      DataRegion dataRegion = Mockito.mock(DataRegion.class);
+      Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
+      FragmentInstanceContext fragmentInstanceContext =
+          createFragmentInstanceContext(instanceId, stateMachine);
+      fragmentInstanceContext.setDataRegion(dataRegion);
+      DataDriverContext driverContext = new 
DataDriverContext(fragmentInstanceContext, 0);
+      PlanNodeId planNodeId1 = new PlanNodeId("1");
+      driverContext.addOperatorContext(1, planNodeId1, 
SeriesScanOperator.class.getSimpleName());
+      PlanNodeId planNodeId2 = new PlanNodeId("2");
+      driverContext.addOperatorContext(2, planNodeId2, 
SeriesScanOperator.class.getSimpleName());
+      driverContext.addOperatorContext(
+          3, new PlanNodeId("3"), 
FullOuterTimeJoinOperator.class.getSimpleName());
+      driverContext.addOperatorContext(4, new PlanNodeId("4"), 
LimitOperator.class.getSimpleName());
+
+      SeriesScanOptions.Builder scanOptionsBuilder = new 
SeriesScanOptions.Builder();
+      scanOptionsBuilder.withAllSensors(allSensors);
+      SeriesScanOperator seriesScanOperator1 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(0),
+              planNodeId1,
+              measurementPath1,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      driverContext.addSourceOperator(seriesScanOperator1);
+      driverContext.addPath(measurementPath1);
+      seriesScanOperator1
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      IFullPath measurementPath2 =
+          new NonAlignedFullPath(
+              IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG + 
".device0"),
+              new MeasurementSchema("sensor1", TSDataType.INT32));
+      SeriesScanOperator seriesScanOperator2 =
+          new SeriesScanOperator(
+              driverContext.getOperatorContexts().get(1),
+              planNodeId2,
+              measurementPath2,
+              Ordering.ASC,
+              scanOptionsBuilder.build());
+      driverContext.addSourceOperator(seriesScanOperator2);
+      driverContext.addPath(measurementPath2);
+
+      seriesScanOperator2
+          .getOperatorContext()
+          .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+      LeftOuterTimeJoinOperator timeJoinOperator =
+          new LeftOuterTimeJoinOperator(
+              driverContext.getOperatorContexts().get(2),
+              seriesScanOperator1,
+              1,
+              seriesScanOperator2,
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+              new AscTimeComparator());
+      SingleDeviceViewOperator fakeOperator =
+          new SingleDeviceViewOperator(
+              driverContext.getOperatorContexts().get(3),
+              "d1",
+              timeJoinOperator,
+              Arrays.asList(0),
+              Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+      fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500, 
TimeUnit.MILLISECONDS));
+
+      fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
+      String deviceId = DATA_DRIVER_TEST_SG + ".device0";
+      Mockito.when(
+              dataRegion.query(
+                  eq(driverContext.getPaths()),
+                  eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)),
+                  eq(fragmentInstanceContext),
+                  Mockito.isNull(),
+                  Mockito.isNull(),
+                  Mockito.anyLong()))
+          .thenReturn(null);
+      fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+      fragmentInstanceContext.initializeNumOfDrivers(1);
+
+      StubSink stubSink = new StubSink(fragmentInstanceContext);
+      driverContext.setSink(stubSink);
+      IDriver dataDriver = null;
+      try {
+        dataDriver = new DataDriver(fakeOperator, driverContext, 0);
+        assertEquals(
+            fragmentInstanceContext.getId(), 
dataDriver.getDriverTaskId().getFragmentInstanceId());
+        assertFalse(dataDriver.isFinished());
+      } finally {
+        if (dataDriver != null) {
+          dataDriver.close();
+        }
+      }
+    } catch (QueryProcessException e) {
+      e.printStackTrace();
+      fail();
+    } finally {
+      instanceNotificationExecutor.shutdown();
+    }
+  }
 }

Reply via email to