This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/1.3.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit f017e3f94f1dfb83d9c06bd6317ab3c35a81d6f1 Author: Caideyipi <[email protected]> AuthorDate: Thu Jun 19 12:00:17 2025 +0800 Pipe: Fixed the NPE caused by the compression timer (#15782) (#15783) --- .../manager/pipe/agent/task/PipeConfigNodeSubtask.java | 9 ++++++--- .../protocol/airgap/IoTDBDataRegionAirGapConnector.java | 2 +- .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java | 2 +- .../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java | 2 +- 4 files changed, 9 insertions(+), 6 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java index 381769d1df3..263dab213e6 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java @@ -26,8 +26,9 @@ import org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager; import org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask; import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment; -import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment; +import org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment; import org.apache.iotdb.commons.pipe.event.EnrichedEvent; import org.apache.iotdb.commons.pipe.event.ProgressReportEvent; import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent; @@ -117,7 +118,8 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { final PipeTaskRuntimeConfiguration runtimeConfiguration = new PipeTaskRuntimeConfiguration( - new PipeTaskRuntimeEnvironment(pipeName, creationTime, CONFIG_REGION_ID.getId())); + new PipeTaskProcessorRuntimeEnvironment( + pipeName, creationTime, CONFIG_REGION_ID.getId(), null)); processor = PipeConfigNodeAgent.plugin() @@ -142,7 +144,8 @@ public class PipeConfigNodeSubtask extends PipeAbstractConnectorSubtask { // 3. Customize connector final PipeTaskRuntimeConfiguration runtimeConfiguration = new PipeTaskRuntimeConfiguration( - new PipeTaskRuntimeEnvironment(pipeName, creationTime, CONFIG_REGION_ID.getId())); + new PipeTaskConnectorRuntimeEnvironment( + pipeName, creationTime, CONFIG_REGION_ID.getId())); outputPipeConnector.customize(connectorParameters, runtimeConfiguration); // 4. Handshake diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java index cc3e330656e..ee35ab46306 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java @@ -308,7 +308,7 @@ public class IoTDBDataRegionAirGapConnector extends IoTDBDataNodeAirGapConnector @Override protected byte[] compressIfNeeded(final byte[] reqInBytes) throws IOException { - if (Objects.isNull(compressionTimer)) { + if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { compressionTimer = PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java index 46f3cfb71d1..fd80f47c45f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java @@ -464,7 +464,7 @@ public class IoTDBDataRegionAsyncConnector extends IoTDBConnector { @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { - if (Objects.isNull(compressionTimer)) { + if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { compressionTimer = PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java index b8e5795484e..784ee14a55a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java @@ -497,7 +497,7 @@ public class IoTDBDataRegionSyncConnector extends IoTDBDataNodeSyncConnector { @Override public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws IOException { - if (Objects.isNull(compressionTimer)) { + if (Objects.isNull(compressionTimer) && Objects.nonNull(attributeSortedString)) { compressionTimer = PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString); }
