This is an automated email from the ASF dual-hosted git repository.

justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/dev/1.3 by this push:
     new 5f04d59fb75 Pipe: Removed the old useless loopback detection logic 
(#16007) (#16010)
5f04d59fb75 is described below

commit 5f04d59fb75df28533430e4cac8f273c78657649
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 15:24:00 2025 +0800

    Pipe: Removed the old useless loopback detection logic (#16007) (#16010)
    
    * remove-useless
    
    * fix
---
 .../pipe/it/autocreate/IoTDBPipeExtractorIT.java   |  7 ++++
 .../airgap/IoTDBDataNodeAirGapConnector.java       | 44 ----------------------
 .../protocol/legacy/IoTDBLegacyPipeConnector.java  | 21 -----------
 .../thrift/sync/IoTDBDataNodeSyncConnector.java    | 37 ------------------
 4 files changed, 7 insertions(+), 102 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 4b33de0a31c..bb5d4bb1459 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -762,6 +762,13 @@ public class IoTDBPipeExtractorIT extends 
AbstractPipeDualAutoIT {
 
     final String receiverIp = receiverDataNode.getIp();
     final int receiverPort = receiverDataNode.getPort();
+    final Consumer<String> handleFailure =
+        o -> {
+          TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+          TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+        };
+
+    boolean insertResult;
 
     try (final SyncConfigNodeIServiceClient client =
         (SyncConfigNodeIServiceClient) 
senderEnv.getLeaderConfigNodeConnection()) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 23183e9e14f..5dcbb87d7a0 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -19,67 +19,23 @@
 
 package org.apache.iotdb.db.pipe.connector.protocol.airgap;
 
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import 
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.IOException;
-import java.net.UnknownHostException;
 import java.util.HashMap;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 public abstract class IoTDBDataNodeAirGapConnector extends 
IoTDBAirGapConnector {
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataNodeAirGapConnector.class);
-
-  @Override
-  public void validate(final PipeParameterValidator validator) throws 
Exception {
-    super.validate(validator);
-
-    final PipeConfig pipeConfig = PipeConfig.getInstance();
-    final Set<TEndPoint> givenNodeUrls = 
parseNodeUrls(validator.getParameters());
-
-    validator.validate(
-        empty -> {
-          try {
-            // Ensure the sink doesn't point to the air gap receiver on 
DataNode itself
-            return !(pipeConfig.getPipeAirGapReceiverEnabled()
-                && NodeUrlUtils.containsLocalAddress(
-                    givenNodeUrls.stream()
-                        .filter(
-                            tEndPoint ->
-                                tEndPoint.getPort() == 
pipeConfig.getPipeAirGapReceiverPort())
-                        .map(TEndPoint::getIp)
-                        .collect(Collectors.toList())));
-          } catch (final UnknownHostException e) {
-            LOGGER.warn("Unknown host when checking pipe sink IP.", e);
-            return false;
-          }
-        },
-        String.format(
-            "One of the endpoints %s of the receivers is pointing back to the 
air gap receiver %s on sender itself, or unknown host when checking pipe sink 
IP.",
-            givenNodeUrls,
-            new TEndPoint(
-                IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
-                pipeConfig.getPipeAirGapReceiverPort())));
-  }
-
   @Override
   protected boolean mayNeedHandshakeWhenFail() {
     return false;
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 82a5bb7dfbc..af1500c335c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
 import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
 import org.apache.iotdb.commons.pipe.config.PipeConfig;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
 import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
@@ -63,14 +62,12 @@ import org.slf4j.LoggerFactory;
 import java.io.File;
 import java.io.IOException;
 import java.io.RandomAccessFile;
-import java.net.UnknownHostException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
 import java.util.Set;
-import java.util.stream.Collectors;
 
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
 import static 
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -136,24 +133,6 @@ public class IoTDBLegacyPipeConnector implements 
PipeConnector {
             parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
             parameters.hasAttribute(SINK_IOTDB_IP_KEY),
             parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
-        .validate(
-            empty -> {
-              try {
-                // Ensure the sink doesn't point to the legacy receiver on 
DataNode itself
-                return !NodeUrlUtils.containsLocalAddress(
-                    givenNodeUrls.stream()
-                        .filter(tEndPoint -> tEndPoint.getPort() == 
ioTDBConfig.getRpcPort())
-                        .map(TEndPoint::getIp)
-                        .collect(Collectors.toList()));
-              } catch (final UnknownHostException e) {
-                LOGGER.warn("Unknown host when checking pipe sink IP.", e);
-                return false;
-              }
-            },
-            String.format(
-                "One of the endpoints %s of the receivers is pointing back to 
the legacy receiver %s on sender itself, or unknown host when checking pipe 
sink IP.",
-                givenNodeUrls,
-                new TEndPoint(ioTDBConfig.getRpcAddress(), 
ioTDBConfig.getRpcPort())))
         .validate(
             args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) 
args[2]),
             String.format(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 89e63e44a79..a9518d1cc71 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -24,13 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
 import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
 import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
 import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
 import 
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager;
 import 
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
 import 
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
 import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
 import org.apache.iotdb.pipe.api.exception.PipeException;
 import org.apache.iotdb.rpc.TSStatusCode;
@@ -38,48 +35,14 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
 import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
 
 import org.apache.tsfile.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.net.UnknownHostException;
 import java.util.List;
 import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
 
 public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector 
{
 
-  private static final Logger LOGGER = 
LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class);
-
   protected IoTDBDataNodeSyncClientManager clientManager;
 
-  @Override
-  public void validate(final PipeParameterValidator validator) throws 
Exception {
-    super.validate(validator);
-
-    final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
-    final Set<TEndPoint> givenNodeUrls = 
parseNodeUrls(validator.getParameters());
-
-    validator.validate(
-        empty -> {
-          try {
-            // Ensure the sink doesn't point to the thrift receiver on 
DataNode itself
-            return !NodeUrlUtils.containsLocalAddress(
-                givenNodeUrls.stream()
-                    .filter(tEndPoint -> tEndPoint.getPort() == 
iotdbConfig.getRpcPort())
-                    .map(TEndPoint::getIp)
-                    .collect(Collectors.toList()));
-          } catch (final UnknownHostException e) {
-            LOGGER.warn("Unknown host when checking pipe sink IP.", e);
-            return false;
-          }
-        },
-        String.format(
-            "One of the endpoints %s of the receivers is pointing back to the 
thrift receiver %s on sender itself, "
-                + "or unknown host when checking pipe sink IP.",
-            givenNodeUrls, new TEndPoint(iotdbConfig.getRpcAddress(), 
iotdbConfig.getRpcPort())));
-  }
-
   @Override
   protected IoTDBSyncClientManager constructClient(
       final List<TEndPoint> nodeUrls,

Reply via email to