This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 918f60e Close zkClients created by TaskStateModelFactory (#1678)
918f60e is described below
commit 918f60ee49d937f2ee2208aeb9e65b6c61703a00
Author: Neal Sun <[email protected]>
AuthorDate: Tue Mar 30 11:41:42 2021 -0700
Close zkClients created by TaskStateModelFactory (#1678)
This PR closes the previously unclosed zkClients in TaskStateModelFactory,
also added retry timeout and log to the logic.
---
.../apache/helix/task/TaskStateModelFactory.java | 62 +++++++++++++++-------
.../helix/task/TestTaskStateModelFactory.java | 48 ++++++++---------
2 files changed, 67 insertions(+), 43 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index aded724..7b9dd74 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,6 +19,7 @@ package org.apache.helix.task;
* under the License.
*/
+import java.time.Duration;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -51,6 +52,9 @@ import org.slf4j.LoggerFactory;
public class TaskStateModelFactory extends StateModelFactory<TaskStateModel> {
private static Logger LOG =
LoggerFactory.getLogger(TaskStateModelFactory.class);
+ // Unit in minutes. Need a retry timeout to prevent zkClient from hanging
infinitely.
+ private static final int ZKCLIENT_OPERATION_RETRY_TIMEOUT = 5;
+
private final HelixManager _manager;
private final Map<String, TaskFactory> _taskFactoryRegistry;
private final ScheduledExecutorService _taskExecutor;
@@ -58,16 +62,7 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
private ThreadPoolExecutorMonitor _monitor;
public TaskStateModelFactory(HelixManager manager, Map<String, TaskFactory>
taskFactoryRegistry) {
- this(manager, taskFactoryRegistry,
Executors.newScheduledThreadPool(TaskUtil
- .getTargetThreadPoolSize(createZkClient(manager),
manager.getClusterName(),
- manager.getInstanceName()), new ThreadFactory() {
- private AtomicInteger threadId = new AtomicInteger(0);
-
- @Override
- public Thread newThread(Runnable r) {
- return new Thread(r, "TaskStateModelFactory-task_thread-" +
threadId.getAndIncrement());
- }
- }));
+ this(manager, taskFactoryRegistry, createThreadPoolExecutor(manager));
}
// DO NOT USE! This size of provided thread pool will not be reflected to
controller
@@ -134,12 +129,6 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
* Create a RealmAwareZkClient to get thread pool sizes
*/
protected static RealmAwareZkClient createZkClient(HelixManager manager) {
- // TODO: revisit the logic here - we are creating a connection although we
already have a
- // manager. We cannot use the connection within manager because some users
connect the manager
- // after registering the state model factory (in which case we cannot use
manager's connection),
- // and some connect the manager before registering the state model factory
(in which case we
- // can use manager's connection). We need to think about the right order
and determine if we
- // want to enforce it, which may cause backward incompatibility.
if (!(manager instanceof ZKHelixManager)) {
// TODO: None-ZKHelixManager cannot initialize this class. After
interface rework of
// HelixManager, the initialization should be allowed.
@@ -148,6 +137,9 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
}
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new
ZNRecordSerializer());
+ // Set operation retry timeout to prevent hanging infinitely
+ clientConfig
+
.setOperationRetryTimeout(Duration.ofMinutes(ZKCLIENT_OPERATION_RETRY_TIMEOUT).toMillis());
String zkAddress = manager.getMetadataStoreConnectionString();
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress
== null) {
@@ -172,8 +164,40 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
}
}
- return SharedZkClientFactory.getInstance().buildZkClient(
- new HelixZkClient.ZkConnectionConfig(zkAddress),
- clientConfig.createHelixZkClientConfig().setZkSerializer(new
ZNRecordSerializer()));
+ // Note: operation retry timeout doesn't take effect due to
github.com/apache/helix/issues/1682
+ return SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
+ clientConfig.createHelixZkClientConfig());
+ }
+
+ private static ScheduledExecutorService
createThreadPoolExecutor(HelixManager manager) {
+ // TODO: revisit the logic here - we are creating a connection although we
already have a
+ // manager. We cannot use the connection within manager because some users
connect the manager
+ // after registering the state model factory (in which case we cannot use
manager's connection),
+ // and some connect the manager before registering the state model factory
(in which case we
+ // can use manager's connection). We need to think about the right order
and determine if we
+ // want to enforce it, which may cause backward incompatibility.
+ RealmAwareZkClient zkClient = createZkClient(manager);
+ int targetThreadPoolSize;
+
+ // Ensure the zkClient is closed after reading the pool size;
+ try {
+ targetThreadPoolSize = TaskUtil
+ .getTargetThreadPoolSize(zkClient, manager.getClusterName(),
manager.getInstanceName());
+ } finally {
+ zkClient.close();
+ }
+
+ LOG.info(
+ "Obtained target thread pool size: {} from cluster {} for instance {}.
Creating thread pool.",
+ targetThreadPoolSize, manager.getClusterName(),
manager.getInstanceName());
+ return Executors.newScheduledThreadPool(targetThreadPoolSize, new
ThreadFactory() {
+ private AtomicInteger threadId = new AtomicInteger(0);
+
+ @Override
+ public Thread newThread(Runnable r) {
+ return new Thread(r, "TaskStateModelFactory-task_thread-" +
threadId.getAndIncrement());
+ }
+ });
}
}
diff --git
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
index bb64989..38286b4 100644
---
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
+++
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -24,6 +24,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import org.apache.helix.HelixManager;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TaskTestBase;
@@ -35,6 +36,7 @@ import
org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.mockito.Mockito;
import org.testng.Assert;
@@ -90,29 +92,20 @@ public class TestTaskStateModelFactory extends TaskTestBase
{
testMSDSServerEndpointKey);
RoutingDataManager.getInstance().reset();
- RealmAwareZkClient zkClient =
TaskStateModelFactory.createZkClient(anyParticipantManager);
- Assert.assertEquals(TaskUtil
- .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
- anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
- Assert.assertTrue(zkClient instanceof FederatedZkClient);
+ verifyThreadPoolSizeAndZkClientClass(anyParticipantManager,
TEST_TARGET_TASK_THREAD_POOL_SIZE,
+ FederatedZkClient.class);
// Turn off multiZk mode in System config, and remove zkAddress
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
ZKHelixManager participantManager = Mockito.spy(anyParticipantManager);
when(participantManager.getMetadataStoreConnectionString()).thenReturn(null);
- zkClient = TaskStateModelFactory.createZkClient(participantManager);
- Assert.assertEquals(TaskUtil
- .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
- anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
- Assert.assertTrue(zkClient instanceof FederatedZkClient);
+ verifyThreadPoolSizeAndZkClientClass(participantManager,
TEST_TARGET_TASK_THREAD_POOL_SIZE,
+ FederatedZkClient.class);
// Test no connection config case
when(participantManager.getRealmAwareZkConnectionConfig()).thenReturn(null);
- zkClient = TaskStateModelFactory.createZkClient(participantManager);
- Assert.assertEquals(TaskUtil
- .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
- anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
- Assert.assertTrue(zkClient instanceof FederatedZkClient);
+ verifyThreadPoolSizeAndZkClientClass(participantManager,
TEST_TARGET_TASK_THREAD_POOL_SIZE,
+ FederatedZkClient.class);
// Remove server endpoint key and use connection config to specify endpoint
System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
@@ -122,11 +115,8 @@ public class TestTaskStateModelFactory extends
TaskTestBase {
.setRoutingDataSourceEndpoint(testMSDSServerEndpointKey)
.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name()).build();
when(participantManager.getRealmAwareZkConnectionConfig()).thenReturn(connectionConfig);
- zkClient = TaskStateModelFactory.createZkClient(participantManager);
- Assert.assertEquals(TaskUtil
- .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
- anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
- Assert.assertTrue(zkClient instanceof FederatedZkClient);
+ verifyThreadPoolSizeAndZkClientClass(participantManager,
TEST_TARGET_TASK_THREAD_POOL_SIZE,
+ FederatedZkClient.class);
// Restore system properties
if (prevMultiZkEnabled == null) {
@@ -151,10 +141,8 @@ public class TestTaskStateModelFactory extends
TaskTestBase {
// Turn off multiZk mode in System config
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
- RealmAwareZkClient zkClient =
TaskStateModelFactory.createZkClient(anyParticipantManager);
- Assert.assertEquals(TaskUtil
- .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
- anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ verifyThreadPoolSizeAndZkClientClass(anyParticipantManager,
TEST_TARGET_TASK_THREAD_POOL_SIZE,
+ SharedZkClientFactory.InnerSharedZkClient.class);
// Restore system properties
if (prevMultiZkEnabled == null) {
@@ -169,4 +157,16 @@ public class TestTaskStateModelFactory extends
TaskTestBase {
public void testZkClientCreationNonZKManager() {
TaskStateModelFactory.createZkClient(new MockManager());
}
+
+ private void verifyThreadPoolSizeAndZkClientClass(HelixManager helixManager,
int threadPoolSize,
+ Class<?> zkClientClass) {
+ RealmAwareZkClient zkClient =
TaskStateModelFactory.createZkClient(helixManager);
+ try {
+ Assert.assertEquals(TaskUtil.getTargetThreadPoolSize(zkClient,
helixManager.getClusterName(),
+ helixManager.getInstanceName()), threadPoolSize);
+ Assert.assertEquals(zkClient.getClass(), zkClientClass);
+ } finally {
+ zkClient.close();
+ }
+ }
}