This is an automated email from the ASF dual-hosted git repository.
justinchen pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 5f04d59fb75 Pipe: Removed the old useless loopback detection logic
(#16007) (#16010)
5f04d59fb75 is described below
commit 5f04d59fb75df28533430e4cac8f273c78657649
Author: Caideyipi <[email protected]>
AuthorDate: Wed Jul 23 15:24:00 2025 +0800
Pipe: Removed the old useless loopback detection logic (#16007) (#16010)
* remove-useless
* fix
---
.../pipe/it/autocreate/IoTDBPipeExtractorIT.java | 7 ++++
.../airgap/IoTDBDataNodeAirGapConnector.java | 44 ----------------------
.../protocol/legacy/IoTDBLegacyPipeConnector.java | 21 -----------
.../thrift/sync/IoTDBDataNodeSyncConnector.java | 37 ------------------
4 files changed, 7 insertions(+), 102 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
index 4b33de0a31c..bb5d4bb1459 100644
---
a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeExtractorIT.java
@@ -762,6 +762,13 @@ public class IoTDBPipeExtractorIT extends
AbstractPipeDualAutoIT {
final String receiverIp = receiverDataNode.getIp();
final int receiverPort = receiverDataNode.getPort();
+ final Consumer<String> handleFailure =
+ o -> {
+ TestUtils.executeNonQueryWithRetry(senderEnv, "flush");
+ TestUtils.executeNonQueryWithRetry(receiverEnv, "flush");
+ };
+
+ boolean insertResult;
try (final SyncConfigNodeIServiceClient client =
(SyncConfigNodeIServiceClient)
senderEnv.getLeaderConfigNodeConnection()) {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
index 23183e9e14f..5dcbb87d7a0 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/airgap/IoTDBDataNodeAirGapConnector.java
@@ -19,67 +19,23 @@
package org.apache.iotdb.db.pipe.connector.protocol.airgap;
-import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.conf.CommonDescriptor;
-import org.apache.iotdb.commons.pipe.config.PipeConfig;
import
org.apache.iotdb.commons.pipe.connector.payload.thrift.common.PipeTransferHandshakeConstant;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBAirGapConnector;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV1Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferDataNodeHandshakeV2Req;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import java.io.IOException;
-import java.net.UnknownHostException;
import java.util.HashMap;
-import java.util.Set;
-import java.util.stream.Collectors;
public abstract class IoTDBDataNodeAirGapConnector extends
IoTDBAirGapConnector {
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataNodeAirGapConnector.class);
-
- @Override
- public void validate(final PipeParameterValidator validator) throws
Exception {
- super.validate(validator);
-
- final PipeConfig pipeConfig = PipeConfig.getInstance();
- final Set<TEndPoint> givenNodeUrls =
parseNodeUrls(validator.getParameters());
-
- validator.validate(
- empty -> {
- try {
- // Ensure the sink doesn't point to the air gap receiver on
DataNode itself
- return !(pipeConfig.getPipeAirGapReceiverEnabled()
- && NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(
- tEndPoint ->
- tEndPoint.getPort() ==
pipeConfig.getPipeAirGapReceiverPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList())));
- } catch (final UnknownHostException e) {
- LOGGER.warn("Unknown host when checking pipe sink IP.", e);
- return false;
- }
- },
- String.format(
- "One of the endpoints %s of the receivers is pointing back to the
air gap receiver %s on sender itself, or unknown host when checking pipe sink
IP.",
- givenNodeUrls,
- new TEndPoint(
- IoTDBDescriptor.getInstance().getConfig().getRpcAddress(),
- pipeConfig.getPipeAirGapReceiverPort())));
- }
-
@Override
protected boolean mayNeedHandshakeWhenFail() {
return false;
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
index 82a5bb7dfbc..af1500c335c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/legacy/IoTDBLegacyPipeConnector.java
@@ -28,7 +28,6 @@ import org.apache.iotdb.commons.consensus.DataRegionId;
import org.apache.iotdb.commons.exception.pipe.PipeRuntimeCriticalException;
import org.apache.iotdb.commons.pipe.config.PipeConfig;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.conf.IoTDBDescriptor;
import org.apache.iotdb.db.pipe.connector.payload.legacy.TsFilePipeData;
@@ -63,14 +62,12 @@ import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
-import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
-import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_IP_KEY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeConnectorConstant.CONNECTOR_IOTDB_PASSWORD_DEFAULT_VALUE;
@@ -136,24 +133,6 @@ public class IoTDBLegacyPipeConnector implements
PipeConnector {
parameters.hasAttribute(CONNECTOR_IOTDB_PORT_KEY),
parameters.hasAttribute(SINK_IOTDB_IP_KEY),
parameters.hasAttribute(SINK_IOTDB_PORT_KEY))
- .validate(
- empty -> {
- try {
- // Ensure the sink doesn't point to the legacy receiver on
DataNode itself
- return !NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(tEndPoint -> tEndPoint.getPort() ==
ioTDBConfig.getRpcPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList()));
- } catch (final UnknownHostException e) {
- LOGGER.warn("Unknown host when checking pipe sink IP.", e);
- return false;
- }
- },
- String.format(
- "One of the endpoints %s of the receivers is pointing back to
the legacy receiver %s on sender itself, or unknown host when checking pipe
sink IP.",
- givenNodeUrls,
- new TEndPoint(ioTDBConfig.getRpcAddress(),
ioTDBConfig.getRpcPort())))
.validate(
args -> !((boolean) args[0]) || ((boolean) args[1] && (boolean)
args[2]),
String.format(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
index 89e63e44a79..a9518d1cc71 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/connector/protocol/thrift/sync/IoTDBDataNodeSyncConnector.java
@@ -24,13 +24,10 @@ import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClient;
import org.apache.iotdb.commons.pipe.connector.client.IoTDBSyncClientManager;
import org.apache.iotdb.commons.pipe.connector.protocol.IoTDBSslSyncConnector;
-import org.apache.iotdb.commons.utils.NodeUrlUtils;
import org.apache.iotdb.db.conf.IoTDBConfig;
-import org.apache.iotdb.db.conf.IoTDBDescriptor;
import
org.apache.iotdb.db.pipe.connector.client.IoTDBDataNodeSyncClientManager;
import
org.apache.iotdb.db.pipe.connector.payload.evolvable.request.PipeTransferPlanNodeReq;
import
org.apache.iotdb.db.pipe.event.common.schema.PipeSchemaRegionWritePlanEvent;
-import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameterValidator;
import org.apache.iotdb.pipe.api.exception.PipeConnectionException;
import org.apache.iotdb.pipe.api.exception.PipeException;
import org.apache.iotdb.rpc.TSStatusCode;
@@ -38,48 +35,14 @@ import org.apache.iotdb.service.rpc.thrift.TPipeTransferReq;
import org.apache.iotdb.service.rpc.thrift.TPipeTransferResp;
import org.apache.tsfile.utils.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.net.UnknownHostException;
import java.util.List;
import java.util.Objects;
-import java.util.Set;
-import java.util.stream.Collectors;
public abstract class IoTDBDataNodeSyncConnector extends IoTDBSslSyncConnector
{
- private static final Logger LOGGER =
LoggerFactory.getLogger(IoTDBDataNodeSyncConnector.class);
-
protected IoTDBDataNodeSyncClientManager clientManager;
- @Override
- public void validate(final PipeParameterValidator validator) throws
Exception {
- super.validate(validator);
-
- final IoTDBConfig iotdbConfig = IoTDBDescriptor.getInstance().getConfig();
- final Set<TEndPoint> givenNodeUrls =
parseNodeUrls(validator.getParameters());
-
- validator.validate(
- empty -> {
- try {
- // Ensure the sink doesn't point to the thrift receiver on
DataNode itself
- return !NodeUrlUtils.containsLocalAddress(
- givenNodeUrls.stream()
- .filter(tEndPoint -> tEndPoint.getPort() ==
iotdbConfig.getRpcPort())
- .map(TEndPoint::getIp)
- .collect(Collectors.toList()));
- } catch (final UnknownHostException e) {
- LOGGER.warn("Unknown host when checking pipe sink IP.", e);
- return false;
- }
- },
- String.format(
- "One of the endpoints %s of the receivers is pointing back to the
thrift receiver %s on sender itself, "
- + "or unknown host when checking pipe sink IP.",
- givenNodeUrls, new TEndPoint(iotdbConfig.getRpcAddress(),
iotdbConfig.getRpcPort())));
- }
-
@Override
protected IoTDBSyncClientManager constructClient(
final List<TEndPoint> nodeUrls,