This is an automated email from the ASF dual-hosted git repository.
jt2594838 pushed a commit to branch dev/1.3
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/dev/1.3 by this push:
new 2717029cbb7 Pipe: isolate async sink client resources by endpoint
(#18015) (#18028)
2717029cbb7 is described below
commit 2717029cbb7b0b97a967ff35fc8292379771f1f0
Author: Caideyipi <[email protected]>
AuthorDate: Thu Jun 25 14:49:48 2026 +0800
Pipe: isolate async sink client resources by endpoint (#18015) (#18028)
(cherry picked from commit 797dfa5d5cfd6dce83b65856d9570dd88e51726e)
---
.../client/IoTDBDataNodeAsyncClientManager.java | 52 +++++++----
.../sink/IoTDBDataNodeAsyncClientManagerTest.java | 102 +++++++++++++++++++++
2 files changed, 135 insertions(+), 19 deletions(-)
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
index e2f6dbec569..20d921e4672 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/client/IoTDBDataNodeAsyncClientManager.java
@@ -56,6 +56,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_PRIORITY_STRATEGY;
import static
org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant.CONNECTOR_LOAD_BALANCE_RANDOM_STRATEGY;
@@ -69,11 +70,11 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private final Set<TEndPoint> endPointSet;
- private static final Map<String, Integer> RECEIVER_ATTRIBUTES_REF_COUNT =
- new ConcurrentHashMap<>();
+ private static final Map<String, Integer> CLIENT_RESOURCE_REF_COUNT = new
ConcurrentHashMap<>();
private final String receiverAttributes;
+ private final String clientResourceKey;
- // receiverAttributes -> IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
+ // clientResourceKey -> IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>
private static final Map<String, IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient>>
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER = new
ConcurrentHashMap<>();
private static final Map<String, ExecutorService>
TS_FILE_ASYNC_EXECUTOR_HOLDER =
@@ -123,10 +124,11 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
validateTsFile,
shouldMarkAsPipeRequest,
isTSFileUsed);
+ clientResourceKey = generateClientResourceKey(receiverAttributes,
endPoints);
synchronized (IoTDBDataNodeAsyncClientManager.class) {
- if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(receiverAttributes))
{
+ if
(!ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.containsKey(clientResourceKey))
{
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.putIfAbsent(
- receiverAttributes,
+ clientResourceKey,
new IClientManager.Factory<TEndPoint,
AsyncPipeDataTransferServiceClient>()
.createClientManager(
isTSFileUsed
@@ -134,21 +136,21 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
.AsyncPipeTsFileDataTransferServiceClientPoolFactory()
: new
ClientPoolFactory.AsyncPipeDataTransferServiceClientPoolFactory()));
}
- endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(receiverAttributes);
+ endPoint2Client =
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.get(clientResourceKey);
if (isTSFileUsed) {
- if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(receiverAttributes)) {
+ if (!TS_FILE_ASYNC_EXECUTOR_HOLDER.containsKey(clientResourceKey)) {
TS_FILE_ASYNC_EXECUTOR_HOLDER.putIfAbsent(
- receiverAttributes,
+ clientResourceKey,
IoTDBThreadPoolFactory.newFixedThreadPool(
PipeConfig.getInstance().getPipeRealTimeQueueMaxWaitingTsFileSize(),
ThreadName.PIPE_TSFILE_ASYNC_SEND_POOL.getName() + "-" +
id.getAndIncrement()));
}
- executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(receiverAttributes);
+ executor = TS_FILE_ASYNC_EXECUTOR_HOLDER.get(clientResourceKey);
}
- RECEIVER_ATTRIBUTES_REF_COUNT.compute(
- receiverAttributes, (attributes, refCount) -> refCount == null ? 1 :
refCount + 1);
+ CLIENT_RESOURCE_REF_COUNT.compute(
+ clientResourceKey, (attributes, refCount) -> refCount == null ? 1 :
refCount + 1);
}
switch (loadBalanceStrategy) {
@@ -394,28 +396,28 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
public void close() {
isClosed = true;
synchronized (IoTDBDataNodeAsyncClientManager.class) {
- RECEIVER_ATTRIBUTES_REF_COUNT.computeIfPresent(
- receiverAttributes,
+ CLIENT_RESOURCE_REF_COUNT.computeIfPresent(
+ clientResourceKey,
(attributes, refCount) -> {
if (refCount <= 1) {
final IClientManager<TEndPoint,
AsyncPipeDataTransferServiceClient> clientManager =
-
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(receiverAttributes);
+
ASYNC_PIPE_DATA_TRANSFER_CLIENT_MANAGER_HOLDER.remove(clientResourceKey);
if (clientManager != null) {
try {
clientManager.close();
LOGGER.info(
- "Closed AsyncPipeDataTransferServiceClientManager for
receiver attributes: {}",
- receiverAttributes);
+ "Closed AsyncPipeDataTransferServiceClientManager for
client resource key: {}",
+ clientResourceKey);
} catch (final Exception e) {
LOGGER.warn(
- "Failed to close
AsyncPipeDataTransferServiceClientManager for receiver attributes: {}",
- receiverAttributes,
+ "Failed to close
AsyncPipeDataTransferServiceClientManager for client resource key: {}",
+ clientResourceKey,
e);
}
}
final ExecutorService executor =
- TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(receiverAttributes);
+ TS_FILE_ASYNC_EXECUTOR_HOLDER.remove(clientResourceKey);
if (executor != null) {
try {
executor.shutdown();
@@ -523,4 +525,16 @@ public class IoTDBDataNodeAsyncClientManager extends
IoTDBClientManager
private void markHealthy(TEndPoint endPoint) {
unhealthyEndPointMap.remove(endPoint);
}
+
+ private static String generateClientResourceKey(
+ final String receiverAttributes, final List<TEndPoint> endPoints) {
+ return String.format(
+ "%s-%s",
+ receiverAttributes,
+ endPoints.stream()
+ .map(endPoint -> String.format("%s:%s", endPoint.getIp(),
endPoint.getPort()))
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining(",", "[", "]")));
+ }
}
diff --git
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
new file mode 100644
index 00000000000..423361002cd
--- /dev/null
+++
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/sink/IoTDBDataNodeAsyncClientManagerTest.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.sink;
+
+import org.apache.iotdb.common.rpc.thrift.TEndPoint;
+import org.apache.iotdb.db.pipe.sink.client.IoTDBDataNodeAsyncClientManager;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.lang.reflect.Field;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+
+public class IoTDBDataNodeAsyncClientManagerTest {
+
+ @Test
+ public void testClientResourcesShouldDifferentiateEndPoints() throws
Exception {
+ final IoTDBDataNodeAsyncClientManager firstManager =
+ new IoTDBDataNodeAsyncClientManager(
+ Collections.singletonList(new TEndPoint("127.0.0.1", 6667)),
+ false,
+ "round-robin",
+ "user",
+ "password",
+ true,
+ "sync",
+ true,
+ true,
+ true);
+ final IoTDBDataNodeAsyncClientManager secondManager =
+ new IoTDBDataNodeAsyncClientManager(
+ Collections.singletonList(new TEndPoint("127.0.0.2", 6667)),
+ false,
+ "round-robin",
+ "user",
+ "password",
+ true,
+ "sync",
+ true,
+ true,
+ true);
+
+ try {
+ Assert.assertEquals(
+ getReceiverAttributes(firstManager),
getReceiverAttributes(secondManager));
+ Assert.assertNotEquals(
+ getClientResourceKey(firstManager),
getClientResourceKey(secondManager));
+ Assert.assertNotSame(getEndPoint2Client(firstManager),
getEndPoint2Client(secondManager));
+ Assert.assertNotSame(getExecutor(firstManager),
getExecutor(secondManager));
+ } finally {
+ firstManager.close();
+ secondManager.close();
+ }
+ }
+
+ private static String getReceiverAttributes(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
+
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("receiverAttributes");
+ field.setAccessible(true);
+ return (String) field.get(manager);
+ }
+
+ private static String getClientResourceKey(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("clientResourceKey");
+ field.setAccessible(true);
+ return (String) field.get(manager);
+ }
+
+ private static Object getEndPoint2Client(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("endPoint2Client");
+ field.setAccessible(true);
+ return field.get(manager);
+ }
+
+ private static ExecutorService getExecutor(final
IoTDBDataNodeAsyncClientManager manager)
+ throws Exception {
+ final Field field =
IoTDBDataNodeAsyncClientManager.class.getDeclaredField("executor");
+ field.setAccessible(true);
+ return (ExecutorService) field.get(manager);
+ }
+}