This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 797dfa5d5cf Pipe: isolate async sink client resources by endpoint
(#18015)
797dfa5d5cf is described below
commit 797dfa5d5cfd6dce83b65856d9570dd88e51726e
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 10:44:56 2026 +0800
Pipe: isolate async sink client resources by endpoint (#18015)
---
.../client/IoTDBDataNodeAsyncClientManager.java | 48 +++++++++++-------
.../sink/IoTDBDataNodeAsyncClientManagerTest.java | 57 ++++++++++++++++++++++
2 files changed, 88 insertions(+), 17 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index 5c38c3a8540..4456550bb52 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -58,6 +58,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
@@ -71,11 +72,11 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final Set<TEndPoint> endPointSet;
- private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
- new ConcurrentHashMap<>();
+ private static final Map<String, Integer> CLIENT_RESOURCE_REF_COUNT = new
ConcurrentHashMap<>();
private final String receiverAttributes;
+ private final String clientResourceKey;
- // receiverAttributes -> IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
+ // clientResourceKey -> IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
private static final Map<String, IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>>
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new
ConcurrentHashMap<>();
private static final Map<String, ExecutorService>
TS_FILE_ASYNC_EXECUTOR_HOLDER =
@@ -129,10 +130,11 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
shouldMarkAsPipeRequest,
isTSFileUsed,
skipIfNoPrivileges);
+ clientResourceKey = generateClientResourceKey(receiverAttributes,
endPoints);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
- if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
+ if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(clientResourceKey))
{
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
- receiverAttributes,
+ clientResourceKey,
new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
.createClientManager(
isTSFileUsed
@@ -140,21 +142,21 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
: new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
- endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
+ endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(clientResourceKey);
if (isTSFileUsed) {
- if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(receiverAttributes)) {
+ if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(clientResourceKey)) {
TS_FILE_ASYNC_EXECUTOR_HOLDER.putIfAbsent(
- receiverAttributes,
+ clientResourceKey,
IoTDBThreadPoolFactory.newFixedThreadPool(
PipeConfig.getInstance().getPipeRealTimeQueueMaxWaitingTsFileSize(),
ThreadName.PIPE_TSFILE_ASYNC_SEND_POOL.getName() + "-" +
id.getAndIncrement()));
}
- executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(receiverAttributes);
+ executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(clientResourceKey);
}
- RECEIVER_ATTRIBUTES_REF_COUNT.compute(
- receiverAttributes, (attributes, refCount) -> refCount == null ? 1 :
refCount + 1);
+ CLIENT_RESOURCE_REF_COUNT.compute(
+ clientResourceKey, (attributes, refCount) -> refCount == null ? 1 :
refCount + 1);
}
switch (loadBalanceStrategy) {
@@ -421,30 +423,30 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
public void close() {
isClosed = true;
synchronized (IoTDBDataNodeAsyncClientManager.class) {
- RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
- receiverAttributes,
+ CLIENT_RESOURCE_REF_COUNT.computeIfPresent(
+ clientResourceKey,
(attributes, refCount) -> {
if (refCount <= 1) {
final IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient> clientManager =
-
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
+
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(clientResourceKey);
if (clientManager != null) {
try {
clientManager.close();
LOGGER.info(
DataNodePipeMessages
.CLOSED_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTES,
- receiverAttributes);
+ clientResourceKey);
} catch (final Exception e) {
LOGGER.warn(
DataNodePipeMessages
.FAILED_TO_CLOSE_ASYNCPIPEDATATRANSFERSERVICECLIENTMANAGER_FOR_RECEIVER_ATTRIBUTE,
- receiverAttributes,
+ clientResourceKey,
e);
}
}
final ExecutorService executor =
- TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(receiverAttributes);
+ TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(clientResourceKey);
if (executor != null) {
try {
executor.shutdown();
@@ -552,4 +554,16 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private void markHealthy(TEndPoint endPoint) {
unhealthyEndPointMap.remove(endPoint);
}
+
+ private static String generateClientResourceKey(
+ final String receiverAttributes, final List<TEndPoint> endPoints) {
+ return String.format(
+ "%s-%s",
+ receiverAttributes,
+ endPoints.stream()
+ .map(endPoint -> String.format("%s:%s", endPoint.getIp(),
endPoint.getPort()))
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining(",", "[", "]")));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
index fb13e438dec..f564dbbd70d 100644
---
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -28,6 +28,7 @@ import org.junit.Test;
import java.lang.reflect.Field;
import java.util.Collections;
+import java.util.concurrent.ExecutorService;
public class IoTDBDataNodeAsyncClientManagerTest {
@@ -71,6 +72,48 @@ public class IoTDBDataNodeAsyncClientManagerTest {
}
}
+ @Test
+ public void testClientResourcesShouldDifferentiateEndPoints() throws
Exception {
+ final IoTDBDataNodeAsyncClientManager firstManager =
+ new IoTDBDataNodeAsyncClientManager(
+ Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+ false,
+ "round-robin",
+ new UserEntity(1L, "user", "cli-host"),
+ "password",
+ true,
+ "sync",
+ true,
+ true,
+ true,
+ true);
+ final IoTDBDataNodeAsyncClientManager secondManager =
+ new IoTDBDataNodeAsyncClientManager(
+ Collections.singletonList(new TEndPoint("127.0.0.2", 6667)),
+ false,
+ "round-robin",
+ new UserEntity(1L, "user", "cli-host"),
+ "password",
+ true,
+ "sync",
+ true,
+ true,
+ true,
+ true);
+
+ try {
+ Assert.assertEquals(
+ getReceiverAttributes(firstManager),
getReceiverAttributes(secondManager));
+ Assert.assertNotEquals(
+ getClientResourceKey(firstManager),
getClientResourceKey(secondManager));
+ Assert.assertNotSame(getEndPoint2Client(firstManager),
getEndPoint2Client(secondManager));
+ Assert.assertNotSame(getExecutor(firstManager),
getExecutor(secondManager));
+ } finally {
+ firstManager.close();
+ secondManager.close();
+ }
+ }
+
private static String getReceiverAttributes(final
IoTDBDataNodeAsyncClientManager manager)
throws Exception {
final Field field =
@@ -79,10 +122,24 @@ public class IoTDBDataNodeAsyncClientManagerTest {
return (String) field.get(manager);
}
+ private static String getClientResourceKey(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("clientResourceKey");
+ field.setAccessible(true);
+ return (String) field.get(manager);
+ }
+
private static Object getEndPoint2Client(final
IoTDBDataNodeAsyncClientManager manager)
throws Exception {
final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client");
field.setAccessible(true);
return field.get(manager);
}
+
+ private static ExecutorService getExecutor(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("executor");
+ field.setAccessible(true);
+ return (ExecutorService) field.get(manager);
+ }
}