This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new cfce7d9089c fix: prevent NPE when isFinished() is called before
DataDriver init (#17440)
cfce7d9089c is described below
commit cfce7d9089cc7bd0468d3cb0662136cea42adc5c
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 8 14:10:29 2026 +0800
fix: prevent NPE when isFinished() is called before DataDriver init (#17440)
---
.../queryengine/execution/driver/DataDriver.java | 5 +
.../db/queryengine/execution/driver/Driver.java | 4 +-
.../queryengine/execution/driver/SchemaDriver.java | 5 +
.../db/queryengine/execution/DataDriverTest.java | 118 +++++++++++++++++++++
4 files changed, 131 insertions(+), 1 deletion(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index 743a05d3777..c9c41f5b993 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -74,6 +74,11 @@ public class DataDriver extends Driver {
return true;
}
+ @Override
+ public boolean isInit() {
+ return init;
+ }
+
/**
* Init seq file list and unseq file list in {@link
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and
set it into each
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index 690830a9b38..ad5eba15229 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -214,7 +214,7 @@ public abstract class Driver implements IDriver {
finished =
state.get() != State.ALIVE
|| driverContext.isDone()
- || root.isFinished()
+ || (isInit() && root.isFinished())
|| sink.isClosed();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -225,6 +225,8 @@ public abstract class Driver implements IDriver {
return finished;
}
+ abstract boolean isInit();
+
@SuppressWarnings({"squid:S1181", "squid:S112"})
private ListenableFuture<?> processInternal() {
long startTimeNanos = System.nanoTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
index b507aa1c748..e3230b7ff7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
@@ -42,6 +42,11 @@ public class SchemaDriver extends Driver {
return true;
}
+ @Override
+ boolean isInit() {
+ return true;
+ }
+
@Override
protected void releaseResource() {
driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 0af5098d4ba..c24fd3dac57 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -35,7 +35,9 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -254,4 +256,120 @@ public class DataDriverTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ @Test
+ public void testCallIsFinishedBeforeDataSourcePrepared() {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ IFullPath measurementPath1 =
+ new NonAlignedFullPath(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG +
".device0"),
+ new MeasurementSchema("sensor0", TSDataType.INT32));
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ DataRegion dataRegion = Mockito.mock(DataRegion.class);
+ Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ fragmentInstanceContext.setDataRegion(dataRegion);
+ DataDriverContext driverContext = new
DataDriverContext(fragmentInstanceContext, 0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
+ driverContext.addOperatorContext(
+ 3, new PlanNodeId("3"),
FullOuterTimeJoinOperator.class.getSimpleName());
+ driverContext.addOperatorContext(4, new PlanNodeId("4"),
LimitOperator.class.getSimpleName());
+
+ SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(allSensors);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(0),
+ planNodeId1,
+ measurementPath1,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ driverContext.addSourceOperator(seriesScanOperator1);
+ driverContext.addPath(measurementPath1);
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ IFullPath measurementPath2 =
+ new NonAlignedFullPath(
+ IDeviceID.Factory.DEFAULT_FACTORY.create(DATA_DRIVER_TEST_SG +
".device0"),
+ new MeasurementSchema("sensor1", TSDataType.INT32));
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(1),
+ planNodeId2,
+ measurementPath2,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ driverContext.addSourceOperator(seriesScanOperator2);
+ driverContext.addPath(measurementPath2);
+
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ LeftOuterTimeJoinOperator timeJoinOperator =
+ new LeftOuterTimeJoinOperator(
+ driverContext.getOperatorContexts().get(2),
+ seriesScanOperator1,
+ 1,
+ seriesScanOperator2,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ new AscTimeComparator());
+ SingleDeviceViewOperator fakeOperator =
+ new SingleDeviceViewOperator(
+ driverContext.getOperatorContexts().get(3),
+ "d1",
+ timeJoinOperator,
+ Arrays.asList(0),
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+ fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
+
+ fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
+ String deviceId = DATA_DRIVER_TEST_SG + ".device0";
+ Mockito.when(
+ dataRegion.query(
+ eq(driverContext.getPaths()),
+ eq(IDeviceID.Factory.DEFAULT_FACTORY.create(deviceId)),
+ eq(fragmentInstanceContext),
+ Mockito.isNull(),
+ Mockito.isNull(),
+ Mockito.anyLong()))
+ .thenReturn(null);
+ fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+ fragmentInstanceContext.initializeNumOfDrivers(1);
+
+ StubSink stubSink = new StubSink(fragmentInstanceContext);
+ driverContext.setSink(stubSink);
+ IDriver dataDriver = null;
+ try {
+ dataDriver = new DataDriver(fakeOperator, driverContext, 0);
+ assertEquals(
+ fragmentInstanceContext.getId(),
dataDriver.getDriverTaskId().getFragmentInstanceId());
+ assertFalse(dataDriver.isFinished());
+ } finally {
+ if (dataDriver != null) {
+ dataDriver.close();
+ }
+ }
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}