This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch legacy-13
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/legacy-13 by this push:
     new 5498095a4ac Fixed the NPE when validating legacy sink (#17153)
5498095a4ac is described below

commit 5498095a4acffef64681deeee3724ad785f61b33
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 14:14:58 2026 +0800

    Fixed the NPE when validating legacy sink (#17153)
    
    * npe-fix
    
    * unb
---
 .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java    | 33 ++++++++--------------
 .../sink/protocol/legacy/IoTDBLegacyPipeSink.java  | 11 +++++---
 2 files changed, 18 insertions(+), 26 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 09fd6ba6d4e..d59201f5605 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
 import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
 import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.EnvFactory;
 import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
 import org.apache.iotdb.it.framework.IoTDBTestRunner;
 import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -33,6 +34,8 @@ import org.junit.Test;
 import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 
+import java.sql.Connection;
+import java.sql.Statement;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
@@ -200,30 +203,16 @@ public class IoTDBPipeDataSinkIT extends 
AbstractPipeDualAutoIT {
     final String receiverIp = receiverDataNode.getIp();
     final int receiverPort = receiverDataNode.getPort();
 
+    try (final Connection connection = EnvFactory.getEnv().getConnection();
+        final Statement statement = connection.createStatement()) {
+      statement.execute(
+          String.format(
+              "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink', 
'ip'='%s', 'port'='%s', 'version'='1.3')",
+              receiverIp, receiverPort));
+    }
+
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
-      final Map<String, String> extractorAttributes = new HashMap<>();
-      final Map<String, String> processorAttributes = new HashMap<>();
-      final Map<String, String> connectorAttributes = new HashMap<>();
-
-      extractorAttributes.put("source.realtime.mode", "log");
-
-      connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
-      connectorAttributes.put("sink.batch.enable", "false");
-      connectorAttributes.put("sink.ip", receiverIp);
-      connectorAttributes.put("sink.port", Integer.toString(receiverPort));
-
-      // This version does not matter since it's no longer checked by the 
legacy receiver
-      connectorAttributes.put("sink.version", "1.3");
-
-      final TSStatus status =
-          client.createPipe(
-              new TCreatePipeReq("testPipe", connectorAttributes)
-                  .setExtractorAttributes(extractorAttributes)
-                  .setProcessorAttributes(processorAttributes));
-
-      Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
status.getCode());
-
       Assert.assertEquals(
           TSStatusCode.SUCCESS_STATUS.getStatusCode(), 
client.startPipe("testPipe").getCode());
 
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index a80293c9467..406c5cb4fdc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -37,6 +37,7 @@ import 
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
 import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
 import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
 import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
 import org.apache.iotdb.pipe.api.PipeConnector;
 import 
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
 import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -107,7 +108,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
   private String syncConnectorVersion;
 
   private String pipeName;
-  private String databaseName;
+  private String databaseName = "";
   private IoTDBSyncClient client;
 
   private SessionPool sessionPool;
@@ -199,10 +200,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector 
{
     trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
     trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
 
-    databaseName =
+    final DataRegion dataRegion =
         StorageEngine.getInstance()
-            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
-            .getDatabaseName();
+            .getDataRegion(new 
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
+    if (Objects.nonNull(dataRegion)) {
+      databaseName = dataRegion.getDatabaseName();
+    }
   }
 
   @Override

Reply via email to