This is an automated email from the ASF dual-hosted git repository.
jiangtian pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 0ecd0ec9eb8 Fixed the NPE when validating legacy sink (#17153) (#17161)
0ecd0ec9eb8 is described below
commit 0ecd0ec9eb8e132ccc377a983fa3bae457eec2fb
Author: Caideyipi <[email protected]>
AuthorDate: Wed Feb 4 16:20:55 2026 +0800
Fixed the NPE when validating legacy sink (#17153) (#17161)
* npe-fix
* unb
---
.../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 33 ++++++++--------------
.../sink/protocol/legacy/IoTDBLegacyPipeSink.java | 11 +++++---
2 files changed, 18 insertions(+), 26 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
index 09fd6ba6d4e..d59201f5605 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.client.sync.SyncConfigNodeIServiceClient;
import org.apache.iotdb.confignode.rpc.thrift.TCreatePipeReq;
import org.apache.iotdb.db.it.utils.TestUtils;
+import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
@@ -33,6 +34,8 @@ import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
+import java.sql.Connection;
+import java.sql.Statement;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -200,30 +203,16 @@ public class IoTDBPipeDataSinkIT extends
AbstractPipeDualAutoIT {
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
+ try (final Connection connection = EnvFactory.getEnv().getConnection();
+ final Statement statement = connection.createStatement()) {
+ statement.execute(
+ String.format(
+ "create pipe testPipe ('sink'='iotdb-legacy-pipe-sink',
'ip'='%s', 'port'='%s', 'version'='1.3')",
+ receiverIp, receiverPort));
+ }
+
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
- final Map<String, String> extractorAttributes = new HashMap<>();
- final Map<String, String> processorAttributes = new HashMap<>();
- final Map<String, String> connectorAttributes = new HashMap<>();
-
- extractorAttributes.put("source.realtime.mode", "log");
-
- connectorAttributes.put("sink", "iotdb-legacy-pipe-sink");
- connectorAttributes.put("sink.batch.enable", "false");
- connectorAttributes.put("sink.ip", receiverIp);
- connectorAttributes.put("sink.port", Integer.toString(receiverPort));
-
- // This version does not matter since it's no longer checked by the
legacy receiver
- connectorAttributes.put("sink.version", "1.3");
-
- final TSStatus status =
- client.createPipe(
- new TCreatePipeReq("testPipe", connectorAttributes)
- .setExtractorAttributes(extractorAttributes)
- .setProcessorAttributes(processorAttributes));
-
- Assert.assertEquals(TSStatusCode.SUCCESS_STATUS.getStatusCode(),
status.getCode());
-
Assert.assertEquals(
TSStatusCode.SUCCESS_STATUS.getStatusCode(),
client.startPipe("testPipe").getCode());
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
index a80293c9467..406c5cb4fdc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java
@@ -37,6 +37,7 @@ import
org.apache.iotdb.db.pipe.event.common.terminate.PipeTerminateEvent;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.pipe.sink.payload.legacy.TsFilePipeData;
import org.apache.iotdb.db.storageengine.StorageEngine;
+import org.apache.iotdb.db.storageengine.dataregion.DataRegion;
import org.apache.iotdb.pipe.api.PipeConnector;
import
org.apache.iotdb.pipe.api.customizer.configuration.PipeConnectorRuntimeConfiguration;
import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
@@ -107,7 +108,7 @@ public class IoTDBLegacyPipeSink implements PipeConnector {
private String syncConnectorVersion;
private String pipeName;
- private String databaseName;
+ private String databaseName = "";
private IoTDBSyncClient client;
private SessionPool sessionPool;
@@ -199,10 +200,12 @@ public class IoTDBLegacyPipeSink implements PipeConnector
{
trustStore = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PATH_KEY);
trustStorePwd = parameters.getString(SINK_IOTDB_SSL_TRUST_STORE_PWD_KEY);
- databaseName =
+ final DataRegion dataRegion =
StorageEngine.getInstance()
- .getDataRegion(new
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()))
- .getDatabaseName();
+ .getDataRegion(new
DataRegionId(configuration.getRuntimeEnvironment().getRegionId()));
+ if (Objects.nonNull(dataRegion)) {
+ databaseName = dataRegion.getDatabaseName();
+ }
}
@Override