This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch rc/1.3.5
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit f017e3f94f1dfb83d9c06bd6317ab3c35a81d6f1
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 19 12:00:17 2025 +0800

    Pipe: Fixed the NPE caused by the compression timer (#15782) (#15783)
---
 .../manager/pipe/agent/task/PipeConfigNodeSubtask.java           | 9 ++++++---
 .../protocol/airgap/IoTDBDataRegionAirGapConnector.java          | 2 +-
 .../protocol/thrift/async/IoTDBDataRegionAsyncConnector.java     | 2 +-
 .../protocol/thrift/sync/IoTDBDataRegionSyncConnector.java       | 2 +-
 4 files changed, 9 insertions(+), 6 deletions(-)

diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
index 381769d1df3..263dab213e6 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/pipe/agent/task/PipeConfigNodeSubtask.java
@@ -26,8 +26,9 @@ import 
org.apache.iotdb.commons.pipe.agent.task.progress.PipeEventCommitManager;
 import 
org.apache.iotdb.commons.pipe.agent.task.subtask.PipeAbstractConnectorSubtask;
 import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant;
 import 
org.apache.iotdb.commons.pipe.config.plugin.configuraion.PipeTaskRuntimeConfiguration;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskConnectorRuntimeEnvironment;
 import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskExtractorRuntimeEnvironment;
-import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskRuntimeEnvironment;
+import 
org.apache.iotdb.commons.pipe.config.plugin.env.PipeTaskProcessorRuntimeEnvironment;
 import org.apache.iotdb.commons.pipe.event.EnrichedEvent;
 import org.apache.iotdb.commons.pipe.event.ProgressReportEvent;
 import org.apache.iotdb.confignode.manager.pipe.agent.PipeConfigNodeAgent;
@@ -117,7 +118,8 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
 
     final PipeTaskRuntimeConfiguration runtimeConfiguration =
         new PipeTaskRuntimeConfiguration(
-            new PipeTaskRuntimeEnvironment(pipeName, creationTime, 
CONFIG_REGION_ID.getId()));
+            new PipeTaskProcessorRuntimeEnvironment(
+                pipeName, creationTime, CONFIG_REGION_ID.getId(), null));
 
     processor =
         PipeConfigNodeAgent.plugin()
@@ -142,7 +144,8 @@ public class PipeConfigNodeSubtask extends 
PipeAbstractConnectorSubtask {
       // 3. Customize connector
       final PipeTaskRuntimeConfiguration runtimeConfiguration =
           new PipeTaskRuntimeConfiguration(
-              new PipeTaskRuntimeEnvironment(pipeName, creationTime, 
CONFIG_REGION_ID.getId()));
+              new PipeTaskConnectorRuntimeEnvironment(
+                  pipeName, creationTime, CONFIG_REGION_ID.getId()));
       outputPipeConnector.customize(connectorParameters, runtimeConfiguration);
 
       // 4. Handshake
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
index cc3e330656e..ee35ab46306 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataRegionAirGapConnector.java
@@ -308,7 +308,7 @@ public class IoTDBDataRegionAirGapConnector extends 
IoTDBDataNodeAirGapConnector
 
   @Override
   protected byte[] compressIfNeeded(final byte[] reqInBytes) throws 
IOException {
-    if (Objects.isNull(compressionTimer)) {
+    if (Objects.isNull(compressionTimer) && 
Objects.nonNull(attributeSortedString)) {
       compressionTimer =
           
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
index 46f3cfb71d1..fd80f47c45f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/async/IoTDBDataRegionAsyncConnector.java
@@ -464,7 +464,7 @@ public class IoTDBDataRegionAsyncConnector extends 
IoTDBConnector {
 
   @Override
   public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
-    if (Objects.isNull(compressionTimer)) {
+    if (Objects.isNull(compressionTimer) && 
Objects.nonNull(attributeSortedString)) {
       compressionTimer =
           
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
     }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
index b8e5795484e..784ee14a55a 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataRegionSyncConnector.java
@@ -497,7 +497,7 @@ public class IoTDBDataRegionSyncConnector extends 
IoTDBDataNodeSyncConnector {
 
   @Override
   public TPipeTransferReq compressIfNeeded(final TPipeTransferReq req) throws 
IOException {
-    if (Objects.isNull(compressionTimer)) {
+    if (Objects.isNull(compressionTimer) && 
Objects.nonNull(attributeSortedString)) {
       compressionTimer =
           
PipeDataRegionConnectorMetrics.getInstance().getCompressionTimer(attributeSortedString);
     }

Reply via email to