This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch zooscalability
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/zooscalability by this push:
new 9d0e7d6 Make ZkBucketDataAccessor realm-aware (#894)
9d0e7d6 is described below
commit 9d0e7d66f1be01dbc35e60bb9849ad43e2339e7d
Author: Hunter Lee <[email protected]>
AuthorDate: Fri Mar 13 17:30:32 2020 -0700
Make ZkBucketDataAccessor realm-aware (#894)
Because Helix Controller now uses WAGED rebalancer as the default
rebalancer, it tries to create an instance of ZkBucketDataAccessor. This will
fail unless ZkBucketDataAccessor was also made realm-aware. We can simply use a
FederatedZkClient here since BucketDataAccessor does not support ephemeral
operations.
---
.../helix/manager/zk/ZkBucketDataAccessor.java | 24 +++++++++++++++++++---
1 file changed, 21 insertions(+), 3 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
index ffed9f3..1ebab28 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java
@@ -34,9 +34,13 @@ import org.apache.helix.AccessOption;
import org.apache.helix.BucketDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.HelixProperty;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.util.GZipCompressionUtil;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError;
@@ -63,7 +67,7 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
private final int _bucketSize;
private final long _versionTTL;
private ZkSerializer _zkSerializer;
- private HelixZkClient _zkClient;
+ private RealmAwareZkClient _zkClient;
private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor;
/**
@@ -73,8 +77,22 @@ public class ZkBucketDataAccessor implements
BucketDataAccessor, AutoCloseable {
* @param versionTTL in ms
*/
public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) {
- _zkClient = DedicatedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) {
+ try {
+ // Create realm-aware ZkClient.
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ new
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build();
+ RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
+ new RealmAwareZkClient.RealmAwareZkClientConfig();
+ _zkClient = new FederatedZkClient(connectionConfig, clientConfig);
+ } catch (IllegalArgumentException | IOException |
InvalidRoutingDataException e) {
+ throw new HelixException("Not able to connect on realm-aware mode", e);
+ }
+ } else {
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ }
+
_zkClient.setZkSerializer(new ZkSerializer() {
@Override
public byte[] serialize(Object data) throws ZkMarshallingError {