This is an automated email from the ASF dual-hosted git repository. jackietien pushed a commit to branch rc/2.0.5 in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit 94bb9bda755172aa3789490bbd6863492c134374 Author: Caideyipi <[email protected]> AuthorDate: Wed Jul 23 15:13:11 2025 +0800 Pipe: Removed the old useless loopback detection logic (#16007) * remove-useless * fix (cherry picked from commit 10d288d9d8e4c3828bf0ff3c987a6cd8ec499c07) --- .../manual/basic/IoTDBPipeExtractorIT.java | 2 +- .../airgap/IoTDBDataNodeAirGapConnector.java | 44 ---------------------- .../protocol/legacy/IoTDBLegacyPipeConnector.java | 21 ----------- .../thrift/sync/IoTDBDataNodeSyncConnector.java | 38 ------------------- 4 files changed, 1 insertion(+), 104 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java index d45e924620f..8a1b1de07b6 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java @@ -461,7 +461,7 @@ public class IoTDBPipeExtractorIT extends AbstractPipeTableModelDualManualIT { TestUtils.executeNonQueryWithRetry(receiverEnv, "flush"); }; - boolean insertResult = true; + boolean insertResult; try (final SyncConfigNodeIServiceClient client = (SyncConfigNodeIServiceClient) senderEnv.getLeaderConfigNodeConnection()) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java index 009d9666812..13842b9b136 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java @@ -19,66 +19,22 @@ package org.apache.iotdb.db.pipe.connector.protocol.airgap; -import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.conf.CommonDescriptor; -import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req; import org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.UnknownHostException; import java.util.HashMap; -import java.util.Set; -import java.util.stream.Collectors; @TreeModel @TableModel public abstract class IoTDBDataNodeAirGapConnector extends IoTDBAirGapConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeAirGapConnector.class); - - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final PipeConfig pipeConfig = PipeConfig.getInstance(); - final Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the air gap receiver on DataNode itself - return !(pipeConfig.getPipeAirGapReceiverEnabled() - && NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter( - tEndPoint -> - tEndPoint.getPort() == pipeConfig.getPipeAirGapReceiverPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList()))); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the air gap receiver %s on sender itself, or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint( - IoTDBDescriptor.getInstance().getConfig().getRpcAddress(), - pipeConfig.getPipeAirGapReceiverPort()))); - } - @Override protected boolean mayNeedHandshakeWhenFail() { return false; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java index 0327b665107..35cbd39149b 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java @@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId; import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException; import org.apache.iotdb.commons.pipe.config.PipeConfig; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData; @@ -64,14 +63,12 @@ import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; import java.io.RandomAccessFile; -import java.net.UnknownHostException; import java.nio.ByteBuffer; import java.util.Arrays; import java.util.HashSet; import java.util.List; import java.util.Objects; import java.util.Set; -import java.util.stream.Collectors; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY; import static org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE; @@ -139,24 +136,6 @@ public class IoTDBLegacyPipeConnector implements PipeConnector { parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY), parameters.hasAttribute(SINK_IOTDB_IP_KEY), parameters.hasAttribute(SINK_IOTDB_PORT_KEY)) - .validate( - empty -> { - try { - // Ensure the sink doesn't point to the legacy receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == ioTDBConfig.getRpcPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the legacy receiver %s on sender itself, or unknown host when checking pipe sink IP.", - givenNodeUrls, - new TEndPoint(ioTDBConfig.getRpcAddress(), ioTDBConfig.getRpcPort()))) .validate( args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean) args[2]), String.format( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java index dbd96fd7d1b..020c2b127b5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java @@ -22,58 +22,20 @@ package org.apache.iotdb.db.pipe.connector.protocol.thrift.sync; import org.apache.iotdb.common.rpc.thrift.TEndPoint; import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager; import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector; -import org.apache.iotdb.commons.utils.NodeUrlUtils; import org.apache.iotdb.db.conf.IoTDBConfig; -import org.apache.iotdb.db.conf.IoTDBDescriptor; import org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager; import org.apache.iotdb.pipe.api.annotation.TableModel; import org.apache.iotdb.pipe.api.annotation.TreeModel; -import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.net.UnknownHostException; import java.util.List; import java.util.Objects; -import java.util.Set; -import java.util.stream.Collectors; @TreeModel @TableModel public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector { - private static final Logger LOGGER = LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class); - protected IoTDBDataNodeSyncClientManager clientManager; - @Override - public void validate(final PipeParameterValidator validator) throws Exception { - super.validate(validator); - - final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig(); - final Set<TEndPoint> givenNodeUrls = parseNodeUrls(validator.getParameters()); - - validator.validate( - empty -> { - try { - // Ensure the sink doesn't point to the thrift receiver on DataNode itself - return !NodeUrlUtils.containsLocalAddress( - givenNodeUrls.stream() - .filter(tEndPoint -> tEndPoint.getPort() == iotdbConfig.getRpcPort()) - .map(TEndPoint::getIp) - .collect(Collectors.toList())); - } catch (final UnknownHostException e) { - LOGGER.warn("Unknown host when checking pipe sink IP.", e); - return false; - } - }, - String.format( - "One of the endpoints %s of the receivers is pointing back to the thrift receiver %s on sender itself, " - + "or unknown host when checking pipe sink IP.", - givenNodeUrls, new TEndPoint(iotdbConfig.getRpcAddress(), iotdbConfig.getRpcPort()))); - } - @Override protected IoTDBSyncClientManager constructClient( final List<TEndPoint> nodeUrls,
