This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 230cd633172 [to dev/1.3] fix: prevent NPE when isFinished() is called
before DataDriver init (#17441)
230cd633172 is described below
commit 230cd6331723a259392dd1f143cf9084a0b9eed9
Author: shuwenwei <[email protected]>
AuthorDate: Wed Apr 8 14:11:59 2026 +0800
[to dev/1.3] fix: prevent NPE when isFinished() is called before DataDriver
init (#17441)
---
.../queryengine/execution/driver/DataDriver.java | 5 +
.../db/queryengine/execution/driver/Driver.java | 4 +-
.../queryengine/execution/driver/SchemaDriver.java | 5 +
.../db/queryengine/execution/DataDriverTest.java | 127 ++++++++++++++++++++-
4 files changed, 138 insertions(+), 3 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
index 743a05d3777..c9c41f5b993 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/DataDriver.java
@@ -74,6 +74,11 @@ public class DataDriver extends Driver {
return true;
}
+ @Override
+ public boolean isInit() {
+ return init;
+ }
+
/**
* Init seq file list and unseq file list in {@link
* org.apache.iotdb.db.storageengine.dataregion.read.QueryDataSource} and
set it into each
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
index ac353e32213..35485dbbd4e 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/Driver.java
@@ -213,7 +213,7 @@ public abstract class Driver implements IDriver {
finished =
state.get() != State.ALIVE
|| driverContext.isDone()
- || root.isFinished()
+ || (isInit() && root.isFinished())
|| sink.isClosed();
} catch (Exception e) {
throw new RuntimeException(e);
@@ -224,6 +224,8 @@ public abstract class Driver implements IDriver {
return finished;
}
+ abstract boolean isInit();
+
@SuppressWarnings({"squid:S1181", "squid:S112"})
private ListenableFuture<?> processInternal() {
long startTimeNanos = System.nanoTime();
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
index b507aa1c748..e3230b7ff7f 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/driver/SchemaDriver.java
@@ -42,6 +42,11 @@ public class SchemaDriver extends Driver {
return true;
}
+ @Override
+ boolean isInit() {
+ return true;
+ }
+
@Override
protected void releaseResource() {
driverContext.getFragmentInstanceContext().decrementNumOfUnClosedDriver();
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
index 1f092e4c97c..5176923f8c2 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/execution/DataDriverTest.java
@@ -22,7 +22,9 @@ import
org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.exception.IllegalPathException;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.MeasurementPath;
+import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
+import org.apache.iotdb.db.exception.WriteProcessException;
import org.apache.iotdb.db.exception.query.QueryProcessException;
import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
@@ -35,7 +37,9 @@ import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContex
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceState;
import
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
import
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.SingleDeviceViewOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.FullOuterTimeJoinOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.join.LeftOuterTimeJoinOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.AscTimeComparator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.SingleColumnMerger;
import
org.apache.iotdb.db.queryengine.execution.operator.source.SeriesScanOperator;
@@ -51,7 +55,6 @@ import
org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import com.google.common.util.concurrent.ListenableFuture;
import io.airlift.units.Duration;
import org.apache.tsfile.enums.TSDataType;
-import org.apache.tsfile.exception.write.WriteProcessException;
import org.apache.tsfile.read.common.block.TsBlock;
import org.apache.tsfile.read.common.block.column.IntColumn;
import org.apache.tsfile.write.schema.MeasurementSchema;
@@ -91,7 +94,11 @@ public class DataDriverTest {
TimeUnit.MILLISECONDS);
@Before
- public void setUp() throws MetadataException, IOException,
WriteProcessException {
+ public void setUp()
+ throws MetadataException,
+ IOException,
+ WriteProcessException,
+ org.apache.tsfile.exception.write.WriteProcessException {
IoTDBDescriptor.getInstance().getConfig().setDriverTaskExecutionTimeSliceInMs(10000);
SeriesReaderTestUtil.setUp(
measurementSchemas, deviceIds, seqResources, unSeqResources,
DATA_DRIVER_TEST_SG);
@@ -248,4 +255,120 @@ public class DataDriverTest {
instanceNotificationExecutor.shutdown();
}
}
+
+ @Test
+ public void testCallIsFinishedBeforeDataSourcePrepared() throws
IllegalPathException {
+ ExecutorService instanceNotificationExecutor =
+ IoTDBThreadPoolFactory.newFixedThreadPool(1,
"test-instance-notification");
+ try {
+ MeasurementPath measurementPath1 =
+ new MeasurementPath(
+ new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
+ new MeasurementSchema("sensor0", TSDataType.INT32));
+ Set<String> allSensors = new HashSet<>();
+ allSensors.add("sensor0");
+ allSensors.add("sensor1");
+ QueryId queryId = new QueryId("stub_query");
+ FragmentInstanceId instanceId =
+ new FragmentInstanceId(new PlanFragmentId(queryId, 0),
"stub-instance");
+ FragmentInstanceStateMachine stateMachine =
+ new FragmentInstanceStateMachine(instanceId,
instanceNotificationExecutor);
+ DataRegion dataRegion = Mockito.mock(DataRegion.class);
+ Mockito.when(dataRegion.tryReadLock(Mockito.anyLong())).thenReturn(true);
+ FragmentInstanceContext fragmentInstanceContext =
+ createFragmentInstanceContext(instanceId, stateMachine);
+ fragmentInstanceContext.setDataRegion(dataRegion);
+ DataDriverContext driverContext = new
DataDriverContext(fragmentInstanceContext, 0);
+ PlanNodeId planNodeId1 = new PlanNodeId("1");
+ driverContext.addOperatorContext(1, planNodeId1,
SeriesScanOperator.class.getSimpleName());
+ PlanNodeId planNodeId2 = new PlanNodeId("2");
+ driverContext.addOperatorContext(2, planNodeId2,
SeriesScanOperator.class.getSimpleName());
+ driverContext.addOperatorContext(
+ 3, new PlanNodeId("3"),
FullOuterTimeJoinOperator.class.getSimpleName());
+ driverContext.addOperatorContext(4, new PlanNodeId("4"),
LimitOperator.class.getSimpleName());
+
+ SeriesScanOptions.Builder scanOptionsBuilder = new
SeriesScanOptions.Builder();
+ scanOptionsBuilder.withAllSensors(allSensors);
+ SeriesScanOperator seriesScanOperator1 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(0),
+ planNodeId1,
+ measurementPath1,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ driverContext.addSourceOperator(seriesScanOperator1);
+ driverContext.addPath(measurementPath1);
+ seriesScanOperator1
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ MeasurementPath measurementPath2 =
+ new MeasurementPath(
+ new PartialPath(DATA_DRIVER_TEST_SG + ".device0"),
+ new MeasurementSchema("sensor1", TSDataType.INT32));
+ SeriesScanOperator seriesScanOperator2 =
+ new SeriesScanOperator(
+ driverContext.getOperatorContexts().get(1),
+ planNodeId2,
+ measurementPath2,
+ Ordering.ASC,
+ scanOptionsBuilder.build());
+ driverContext.addSourceOperator(seriesScanOperator2);
+ driverContext.addPath(measurementPath2);
+
+ seriesScanOperator2
+ .getOperatorContext()
+ .setMaxRunTime(new Duration(500, TimeUnit.MILLISECONDS));
+
+ LeftOuterTimeJoinOperator timeJoinOperator =
+ new LeftOuterTimeJoinOperator(
+ driverContext.getOperatorContexts().get(2),
+ seriesScanOperator1,
+ 1,
+ seriesScanOperator2,
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32),
+ new AscTimeComparator());
+ SingleDeviceViewOperator fakeOperator =
+ new SingleDeviceViewOperator(
+ driverContext.getOperatorContexts().get(3),
+ "d1",
+ timeJoinOperator,
+ Arrays.asList(0),
+ Arrays.asList(TSDataType.INT32, TSDataType.INT32));
+ fakeOperator.getOperatorContext().setMaxRunTime(new Duration(500,
TimeUnit.MILLISECONDS));
+
+ fragmentInstanceContext.setSourcePaths(driverContext.getPaths());
+ String deviceId = DATA_DRIVER_TEST_SG + ".device0";
+ Mockito.when(
+ dataRegion.query(
+ eq(driverContext.getPaths()),
+ eq(deviceId),
+ eq(fragmentInstanceContext),
+ Mockito.isNull(),
+ Mockito.isNull(),
+ Mockito.anyLong()))
+ .thenReturn(null);
+ fragmentInstanceContext.initQueryDataSource(driverContext.getPaths());
+ fragmentInstanceContext.initializeNumOfDrivers(1);
+
+ StubSink stubSink = new StubSink(fragmentInstanceContext);
+ driverContext.setSink(stubSink);
+ IDriver dataDriver = null;
+ try {
+ dataDriver = new DataDriver(fakeOperator, driverContext, 0);
+ assertEquals(
+ fragmentInstanceContext.getId(),
dataDriver.getDriverTaskId().getFragmentInstanceId());
+ assertFalse(dataDriver.isFinished());
+ } finally {
+ if (dataDriver != null) {
+ dataDriver.close();
+ }
+ }
+ } catch (QueryProcessException e) {
+ e.printStackTrace();
+ fail();
+ } finally {
+ instanceNotificationExecutor.shutdown();
+ }
+ }
}