This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 132715785 Reuse zkclient in BestPossibleExternalViewVerifier and fix
resource leak (#2180)
132715785 is described below
commit 132715785e90803ad8991da491f4621db1668fb8
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Mon Aug 1 10:49:12 2022 -0700
Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak
(#2180)
Reuse zkclient in BestPossibleExternalViewVerifier and fix resource leak
Reuse zkclient in verifier and improve resource closure logic to avoid
resource leak.
---
.../rebalancer/waged/ReadOnlyWagedRebalancer.java | 5 ---
.../helix/manager/zk/ZkBucketDataAccessor.java | 39 ++++++++++---------
.../BestPossibleExternalViewVerifier.java | 44 ++++++++++------------
.../ClusterVerifiers/ZkHelixClusterVerifier.java | 7 +++-
.../apache/helix/rest/server/ServerContext.java | 2 +-
5 files changed, 46 insertions(+), 51 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
index d1075d47b..e94148e99 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/rebalancer/waged/ReadOnlyWagedRebalancer.java
@@ -88,10 +88,5 @@ public class ReadOnlyWagedRebalancer extends WagedRebalancer
{
_bestPossibleAssignment = bestPossibleAssignment;
return true;
}
-
- @Override
- // BucketDataAccessor will be reused, won't be closed here.
- public void close() {
- }
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index 521e3d720..c20b3f902 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -41,12 +41,13 @@ import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
+import
org.apache.helix.zookeeper.datamodel.serializer.ZNRecordJacksonSerializer;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
-import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.slf4j.Logger;
@@ -75,10 +76,11 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
private final int _bucketSize;
private final long _versionTTLms;
- private ZkSerializer _zkSerializer;
- private RealmAwareZkClient _zkClient;
- private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
- private Map<String, ScheduledFuture> _gcTaskFutureMap = new HashMap<>();
+ private final ZkSerializer _zkSerializer;
+ private final RealmAwareZkClient _zkClient;
+ private final ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
+ private final Map<String, ScheduledFuture> _gcTaskFutureMap = new
HashMap<>();
+ private boolean _usesExternalZkClient = false;
/**
* Constructor that allows a custom bucket size.
@@ -87,25 +89,21 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
* @param versionTTLms in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long
versionTTLms) {
- _zkClient = createRealmAwareZkClient(zkAddr);
- _zkClient.setZkSerializer(new ZkSerializer() {
- @Override
- public byte[] serialize(Object data) throws ZkMarshallingError {
- if (data instanceof byte[]) {
- return (byte[]) data;
- }
- throw new HelixException("ZkBucketDataAccesor only supports a byte
array as an argument!");
- }
+ this(createRealmAwareZkClient(zkAddr), bucketSize, versionTTLms, false);
+ }
- @Override
- public Object deserialize(byte[] data) throws ZkMarshallingError {
- return data;
- }
- });
+ public ZkBucketDataAccessor(RealmAwareZkClient zkClient) {
+ this(zkClient, DEFAULT_BUCKET_SIZE, DEFAULT_VERSION_TTL, true);
+ }
+
+ private ZkBucketDataAccessor(RealmAwareZkClient zkClient, int bucketSize,
long versionTTLms,
+ boolean usesExternalZkClient) {
+ _zkClient = zkClient;
_zkBaseDataAccessor = new ZkBaseDataAccessor<>(_zkClient);
_zkSerializer = new ZNRecordJacksonSerializer();
_bucketSize = bucketSize;
_versionTTLms = versionTTLms;
+ _usesExternalZkClient = usesExternalZkClient;
}
/**
@@ -135,6 +133,7 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
zkClient = DedicatedZkClientFactory.getInstance()
.buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
}
+ zkClient.setZkSerializer(new ByteArraySerializer());
return zkClient;
}
@@ -258,7 +257,7 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
@Override
public void disconnect() {
- if (!_zkClient.isClosed()) {
+ if (!_usesExternalZkClient && _zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
}
diff --git
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
index 3b133a138..1997bea06 100644
---
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
+++
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java
@@ -34,6 +34,7 @@ import org.apache.helix.HelixRebalanceException;
import org.apache.helix.PropertyKey;
import org.apache.helix.controller.common.PartitionStateMap;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
+import org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer;
import org.apache.helix.controller.rebalancer.waged.RebalanceAlgorithm;
import org.apache.helix.controller.stages.AttributeName;
import org.apache.helix.controller.stages.BestPossibleStateCalcStage;
@@ -91,6 +92,7 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
_resources = resources;
_expectLiveInstances = expectLiveInstances;
_dataProvider = new ResourceControllerDataProvider();
+ // _zkClient should be closed with BestPossibleExternalViewVerifier
}
/**
@@ -105,7 +107,7 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
public BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String
clusterName,
Set<String> resources, Map<String, Map<String, String>> errStates,
Set<String> expectLiveInstances) {
- this(zkClient, clusterName, resources, errStates, expectLiveInstances, 0);
+ this(zkClient, clusterName, errStates, resources, expectLiveInstances, 0,
true);
}
@Deprecated
@@ -114,11 +116,7 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
Set<String> expectLiveInstances, int waitTillVerify) {
// usesExternalZkClient = true because ZkClient is given by the caller
// at close(), we will not close this ZkClient because it might be being
used elsewhere
- super(zkClient, clusterName, true, waitTillVerify);
- _errStates = errStates;
- _resources = resources;
- _expectLiveInstances = expectLiveInstances;
- _dataProvider = new ResourceControllerDataProvider();
+ this(zkClient, clusterName, errStates, resources, expectLiveInstances,
waitTillVerify, true);
}
private BestPossibleExternalViewVerifier(RealmAwareZkClient zkClient, String
clusterName,
@@ -144,7 +142,6 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
private Set<String> _resources;
private Set<String> _expectLiveInstances;
private RealmAwareZkClient _zkClient;
- private boolean _usesExternalZkClient = false; // false by default
public Builder(String clusterName) {
_clusterName = clusterName;
@@ -155,11 +152,12 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
throw new IllegalArgumentException("Cluster name is missing!");
}
+ // _usesExternalZkClient == true
if (_zkClient != null) {
- return new BestPossibleExternalViewVerifier(_zkClient, _clusterName,
_resources, _errStates,
- _expectLiveInstances, _waitPeriodTillVerify);
+ return new BestPossibleExternalViewVerifier(_zkClient, _clusterName,
_errStates, _resources,
+ _expectLiveInstances, _waitPeriodTillVerify, true);
}
-
+ // _usesExternalZkClient == false
if (_realmAwareZkConnectionConfig == null || _realmAwareZkClientConfig
== null) {
// For backward-compatibility
return new BestPossibleExternalViewVerifier(_zkAddress, _clusterName,
_resources,
@@ -170,7 +168,7 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
return new BestPossibleExternalViewVerifier(
createZkClient(RealmAwareZkClient.RealmMode.SINGLE_REALM,
_realmAwareZkConnectionConfig,
_realmAwareZkClientConfig, _zkAddress), _clusterName,
_errStates, _resources,
- _expectLiveInstances, _waitPeriodTillVerify, _usesExternalZkClient);
+ _expectLiveInstances, _waitPeriodTillVerify, false);
}
public String getClusterName() {
@@ -210,7 +208,6 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
public Builder setZkClient(RealmAwareZkClient zkClient) {
_zkClient = zkClient;
- _usesExternalZkClient = true; // Set the flag since external ZkClient is
used
return this;
}
}
@@ -435,18 +432,15 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
RebalanceUtil.runStage(event, new CurrentStateComputationStage());
// Note the readOnlyWagedRebalancer is just for one time usage
- DryrunWagedRebalancer dryrunWagedRebalancer =
- new DryrunWagedRebalancer(_zkClient.getServers(),
cache.getClusterName(),
- cache.getClusterConfig().getGlobalRebalancePreference());
- event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
dryrunWagedRebalancer);
- try {
+
+ try (ZkBucketDataAccessor zkBucketDataAccessor = new
ZkBucketDataAccessor(_zkClient);
+ DryrunWagedRebalancer dryrunWagedRebalancer = new
DryrunWagedRebalancer(zkBucketDataAccessor,
+ cache.getClusterName(),
cache.getClusterConfig().getGlobalRebalancePreference())) {
+ event.addAttribute(AttributeName.STATEFUL_REBALANCER.name(),
dryrunWagedRebalancer);
RebalanceUtil.runStage(event, new BestPossibleStateCalcStage());
- } finally {
- dryrunWagedRebalancer.close();
}
- BestPossibleStateOutput output =
event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
- return output;
+ return event.getAttribute(AttributeName.BEST_POSSIBLE_STATE.name());
}
@Override
@@ -456,15 +450,17 @@ public class BestPossibleExternalViewVerifier extends
ZkHelixClusterVerifier {
+ (_resources != null ? Arrays.toString(_resources.toArray()) : "") +
"])";
}
+ // TODO: to clean up, finalize is deprecated in Java 9
@Override
public void finalize() {
close();
+ super.finalize();
}
- private class DryrunWagedRebalancer extends
org.apache.helix.controller.rebalancer.waged.ReadOnlyWagedRebalancer {
- public DryrunWagedRebalancer(String metadataStoreAddress, String
clusterName,
+ private static class DryrunWagedRebalancer extends ReadOnlyWagedRebalancer
implements AutoCloseable {
+ public DryrunWagedRebalancer(ZkBucketDataAccessor zkBucketDataAccessor,
String clusterName,
Map<ClusterConfig.GlobalRebalancePreferenceKey, Integer> preferences) {
- super(new ZkBucketDataAccessor(metadataStoreAddress), clusterName,
preferences);
+ super(zkBucketDataAccessor, clusterName, preferences);
}
@Override
diff --git
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
index 623a91ecd..11071d3ee 100644
---
a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
+++
b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/ZkHelixClusterVerifier.java
@@ -44,7 +44,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class ZkHelixClusterVerifier
- implements IZkChildListener, IZkDataListener, HelixClusterVerifier {
+ implements IZkChildListener, IZkDataListener, HelixClusterVerifier,
AutoCloseable {
private static Logger LOG =
LoggerFactory.getLogger(ZkHelixClusterVerifier.class);
protected static int DEFAULT_TIMEOUT = 300 * 1000;
protected static int DEFAULT_PERIOD = 500;
@@ -229,6 +229,11 @@ public abstract class ZkHelixClusterVerifier
return verifyByPolling(DEFAULT_TIMEOUT, DEFAULT_PERIOD);
}
+ /**
+ * Implement close() for {@link AutoCloseable} and {@link
HelixClusterVerifier}.
+ * Non-external resources should be closed in this method to prevent
resource leak.
+ */
+ @Override
public void close() {
if (_zkClient != null && !_usesExternalZkClient) {
_zkClient.close();
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index a1cfb6695..27921f6e2 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -285,7 +285,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
if (_zkBucketDataAccessor == null) {
synchronized (this) {
if (_zkBucketDataAccessor == null) {
- _zkBucketDataAccessor = new ZkBucketDataAccessor(_zkAddr);
+ _zkBucketDataAccessor = new
ZkBucketDataAccessor(getByteArrayRealmAwareZkClient());
}
}
}