This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 10d288d9d8e Pipe: Removed the old useless loopback detection logic
(#16007)
10d288d9d8e is described below
commit 10d288d9d8e4c3828bf0ff3c987a6cd8ec499c07
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 15:13:11 2025 +0800
Pipe: Removed the old useless loopback detection logic (#16007)
* remove-useless
* fix
---
.../manual/basic/IoTDBPipeExtractorIT.java | 2 +-
.../airgap/IoTDBDataNodeAirGapConnector.java | 44 ----------------------
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 21 -----------
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 38 -------------------
4 files changed, 1 insertion(+), 104 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java
index d45e924620f..8a1b1de07b6 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/basic/IoTDBPipeExtractorIT.java
@@ -461,7 +461,7 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeTableModelDualManualIT {
TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
};
- boolean insertResult = true;
+ boolean insertResult;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 009d9666812..13842b9b136 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -19,66 +19,22 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.HashMap;
-import java.util.Set;
-import java.util.stream.Collectors;
@TreeModel
@TableModel
public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataNodeAirGapConnector.class);
-
- @Override
- public void validate(final PipeParameterValidator validator) throws
Exception {
- super.validate(validator);
-
- final PipeConfig pipeConfig = PipeConfig.getInstance();
- final Set<TEndPoint> givenNodeUrls =
parseNodeUrls(validator.getParameters());
-
- validator.validate(
- empty -> {
- try {
- // Ensure the sink doesn't point to the air gap receiver on
DataNode itself
- return !(pipeConfig.getPipeAirGapReceiverEnabled()
- && NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(
- tEndPoint ->
- tEndPoint.getPort() ==
pipeConfig.getPipeAirGapReceiverPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList())));
- } catch (final UnknownHostException e) {
- LOGGER.warn("Unknown host when checking pipe sink IP.", e);
- return false;
- }
- },
- String.format(
- "One of the endpoints %s of the receivers is pointing back to the
air gap receiver %s on sender itself, or unknown host when checking pipe sink
IP.",
- givenNodeUrls,
- new TEndPoint(
- IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
- pipeConfig.getPipeAirGapReceiverPort())));
- }
-
@Override
protected boolean mayNeedHandshakeWhenFail() {
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 0327b665107..35cbd39149b 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
@@ -64,14 +63,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -139,24 +136,6 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
parameters.hasAttribute(SINK_IOTDB_IP_KEY),
parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
- .validate(
- empty -> {
- try {
- // Ensure the sink doesn't point to the legacy receiver on
DataNode itself
- return !NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(tEndPoint -> tEndPoint.getPort() ==
ioTDBConfig.getRpcPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList()));
- } catch (final UnknownHostException e) {
- LOGGER.warn("Unknown host when checking pipe sink IP.", e);
- return false;
- }
- },
- String.format(
- "One of the endpoints %s of the receivers is pointing back to
the legacy receiver %s on sender itself, or unknown host when checking pipe
sink IP.",
- givenNodeUrls,
- new TEndPoint(ioTDBConfig.getRpcAddress(),
ioTDBConfig.getRpcPort())))
.validate(
args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean)
args[2]),
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index dbd96fd7d1b..020c2b127b5 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -22,58 +22,20 @@ package
org.apache.iotdb.db.pipe.connector.protocol.thrift.sync;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager;
import org.apache.iotdb.pipe.api.annotation.TableModel;
import org.apache.iotdb.pipe.api.annotation.TreeModel;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.net.UnknownHostException;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
@TreeModel
@TableModel
public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class);
-
protected IoTDBDataNodeSyncClientManager clientManager;
- @Override
- public void validate(final PipeParameterValidator validator) throws
Exception {
- super.validate(validator);
-
- final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
- final Set<TEndPoint> givenNodeUrls =
parseNodeUrls(validator.getParameters());
-
- validator.validate(
- empty -> {
- try {
- // Ensure the sink doesn't point to the thrift receiver on
DataNode itself
- return !NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(tEndPoint -> tEndPoint.getPort() ==
iotdbConfig.getRpcPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList()));
- } catch (final UnknownHostException e) {
- LOGGER.warn("Unknown host when checking pipe sink IP.", e);
- return false;
- }
- },
- String.format(
- "One of the endpoints %s of the receivers is pointing back to the
thrift receiver %s on sender itself, "
- + "or unknown host when checking pipe sink IP.",
- givenNodeUrls, new TEndPoint(iotdbConfig.getRpcAddress(),
iotdbConfig.getRpcPort())));
- }
-
@Override
protected IoTDBSyncClientManager constructClient(
final List<TEndPoint> nodeUrls,