This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch zooscalability in repository https://gitbox.apache.org/repos/asf/helix.git
commit 9a4f30811873d47a6ef3c298aa36a3cc8b1a631e Author: Hunter Lee <[email protected]> AuthorDate: Fri Mar 13 17:30:32 2020 -0700 Make ZkBucketDataAccessor realm-aware (#894) Because Helix Controller now uses WAGED rebalancer as the default rebalancer, it tries to create an instance of ZkBucketDataAccessor. This will fail unless ZkBucketDataAccessor was also made realm-aware. We can simply use a FederatedZkClient here since BucketDataAccessor does not support ephemeral operations. --- .../helix/manager/zk/ZkBucketDataAccessor.java | 24 +++++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java index ffed9f3..1ebab28 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBucketDataAccessor.java @@ -34,9 +34,13 @@ import org.apache.helix.AccessOption; import org.apache.helix.BucketDataAccessor; import org.apache.helix.HelixException; import org.apache.helix.HelixProperty; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.msdcommon.exception.InvalidRoutingDataException; import org.apache.helix.util.GZipCompressionUtil; import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.client.FederatedZkClient; import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; import org.apache.helix.zookeeper.zkclient.DataUpdater; import org.apache.helix.zookeeper.zkclient.exception.ZkMarshallingError; @@ -63,7 +67,7 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable { private final int _bucketSize; private final long _versionTTL; private ZkSerializer _zkSerializer; - private HelixZkClient _zkClient; + private RealmAwareZkClient _zkClient; private ZkBaseDataAccessor<byte[]> _zkBaseDataAccessor; /** @@ -73,8 +77,22 @@ public class ZkBucketDataAccessor implements BucketDataAccessor, AutoCloseable { * @param versionTTL in ms */ public ZkBucketDataAccessor(String zkAddr, int bucketSize, long versionTTL) { - _zkClient = DedicatedZkClientFactory.getInstance() - .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED)) { + try { + // Create realm-aware ZkClient. + RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder().build(); + RealmAwareZkClient.RealmAwareZkClientConfig clientConfig = + new RealmAwareZkClient.RealmAwareZkClientConfig(); + _zkClient = new FederatedZkClient(connectionConfig, clientConfig); + } catch (IllegalArgumentException | IOException | InvalidRoutingDataException e) { + throw new HelixException("Not able to connect on realm-aware mode", e); + } + } else { + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + } + _zkClient.setZkSerializer(new ZkSerializer() { @Override public byte[] serialize(Object data) throws ZkMarshallingError {
