This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch mergemaster0808
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit 0aa75b6fd296158c0369d56c3c7723ddbe37a42a
Author: Steve Yurong Su <[email protected]>
AuthorDate: Thu Aug 1 15:13:54 2024 +0800

    Pipe/Subscription: Prevent NPE when some pipe SQL parameter values are null 
(#13069)
    
    (cherry picked from commit cceaebd591955af95fb16b0f373bd8dc9c917cad)
---
 .../api/customizer/parameter/PipeParameters.java   | 25 +++++++++++++++++++---
 .../subscription/topic/CreateTopicProcedure.java   | 14 +++++-------
 2 files changed, 27 insertions(+), 12 deletions(-)

diff --git 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
index 1190054c6e9..1646423b133 100644
--- 
a/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
+++ 
b/iotdb-api/pipe-api/src/main/java/org/apache/iotdb/pipe/api/customizer/parameter/PipeParameters.java
@@ -278,13 +278,32 @@ public class PipeParameters {
         .collect(
             Collectors.toMap(
                 Entry::getKey,
-                entry -> ValueHider.hide(entry.getKey(), entry.getValue()),
-                // The key won't be duplicated thus we just return the oldValue
-                (u, v) -> u,
+                entry -> {
+                  final String value = ValueHider.hide(entry.getKey(), 
entry.getValue());
+                  return value == null ? "null" : value;
+                },
+                (v1, v2) -> {
+                  final boolean v1IsNull = isNullValue(v1);
+                  final boolean v2IsNull = isNullValue(v2);
+                  if (v1IsNull && v2IsNull) {
+                    return "null";
+                  }
+                  if (v1IsNull) {
+                    return v2;
+                  }
+                  if (v2IsNull) {
+                    return v1;
+                  }
+                  return v1;
+                },
                 TreeMap::new))
         .toString();
   }
 
+  private static boolean isNullValue(final String value) {
+    return value == null || value.equals("null");
+  }
+
   /**
    * This method adds (non-existed) or replaces (existed) equivalent 
attributes in this
    * PipeParameters with those from another PipeParameters.
diff --git 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
index b712861cb54..65035f996bf 100644
--- 
a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
+++ 
b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/impl/subscription/topic/CreateTopicProcedure.java
@@ -86,8 +86,7 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
   @Override
   protected void executeFromOperateOnConfigNodes(ConfigNodeProcedureEnv env)
       throws SubscriptionException {
-    LOGGER.info(
-        "CreateTopicProcedure: executeFromOperateOnConfigNodes({})", 
topicMeta.getTopicName());
+    LOGGER.info("CreateTopicProcedure: executeFromOperateOnConfigNodes({})", 
topicMeta);
 
     TSStatus response;
     try {
@@ -108,8 +107,7 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
   @Override
   protected void executeFromOperateOnDataNodes(ConfigNodeProcedureEnv env)
       throws SubscriptionException {
-    LOGGER.info(
-        "CreateTopicProcedure: executeFromOperateOnDataNodes({})", 
topicMeta.getTopicName());
+    LOGGER.info("CreateTopicProcedure: executeFromOperateOnDataNodes({})", 
topicMeta);
 
     try {
       final List<TSStatus> statuses = 
env.pushSingleTopicOnDataNode(topicMeta.serialize());
@@ -129,13 +127,12 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
 
   @Override
   protected void rollbackFromValidate(ConfigNodeProcedureEnv env) {
-    LOGGER.info("CreateTopicProcedure: rollbackFromValidate({})", 
topicMeta.getTopicName());
+    LOGGER.info("CreateTopicProcedure: rollbackFromValidate({})", topicMeta);
   }
 
   @Override
   protected void rollbackFromOperateOnConfigNodes(ConfigNodeProcedureEnv env) {
-    LOGGER.info(
-        "CreateTopicProcedure: rollbackFromCreateOnConfigNodes({})", 
topicMeta.getTopicName());
+    LOGGER.info("CreateTopicProcedure: rollbackFromCreateOnConfigNodes({})", 
topicMeta);
 
     TSStatus response;
     try {
@@ -158,8 +155,7 @@ public class CreateTopicProcedure extends 
AbstractOperateSubscriptionProcedure {
 
   @Override
   protected void rollbackFromOperateOnDataNodes(ConfigNodeProcedureEnv env) {
-    LOGGER.info(
-        "CreateTopicProcedure: rollbackFromCreateOnDataNodes({})", 
topicMeta.getTopicName());
+    LOGGER.info("CreateTopicProcedure: rollbackFromCreateOnDataNodes({})", 
topicMeta);
 
     final List<TSStatus> statuses = 
env.dropSingleTopicOnDataNode(topicMeta.getTopicName());
     if (RpcUtils.squashResponseStatusList(statuses).getCode()

Reply via email to