This is an automated email from the ASF dual-hosted git repository.

hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new acaf13f  Make byte array data accessor use RealmAwareZkClient (#1480)
acaf13f is described below

commit acaf13fbfa37a3bd5a80d32cd15d90057ae342b6
Author: Hunter Lee <[email protected]>
AuthorDate: Wed Oct 21 21:08:58 2020 -0700

    Make byte array data accessor use RealmAwareZkClient (#1480)
    
    This change was left out of the ZooScalability migration of helix-rest, 
making ZooKeeperAccessor endpoints fail in a multi-zk setting. This change 
fixes this.
---
 .../apache/helix/rest/server/ServerContext.java    | 166 +++++++++++++--------
 1 file changed, 106 insertions(+), 60 deletions(-)

diff --git 
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java 
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 9bb3098..231284a 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -31,7 +31,6 @@ import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixException;
 import org.apache.helix.InstanceType;
 import org.apache.helix.SystemPropertyKeys;
-import org.apache.helix.manager.zk.ByteArraySerializer;
 import org.apache.helix.manager.zk.ZKHelixAdmin;
 import org.apache.helix.manager.zk.ZKHelixDataAccessor;
 import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -43,6 +42,7 @@ import org.apache.helix.zookeeper.api.client.HelixZkClient;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
 import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
 import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
 import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -52,6 +52,7 @@ import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.apache.helix.zookeeper.zkclient.IZkChildListener;
 import org.apache.helix.zookeeper.zkclient.IZkDataListener;
 import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
 import org.apache.zookeeper.Watcher;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -64,6 +65,7 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
   private boolean _isMultiZkEnabled;
   private final String _msdsEndpoint;
   private volatile RealmAwareZkClient _zkClient;
+  private volatile RealmAwareZkClient _byteArrayZkClient;
 
   private volatile ZKHelixAdmin _zkHelixAdmin;
   private volatile ClusterSetup _clusterSetup;
@@ -81,7 +83,7 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
    */
   private ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
   // Create a dedicated ZkClient for listening to data changes in routing data
-  private RealmAwareZkClient _zkClientForListener;
+  private RealmAwareZkClient _zkClientForRoutingDataListener;
 
   public ServerContext(String zkAddr) {
     this(zkAddr, false, null);
@@ -109,54 +111,99 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
     _zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance();
   }
 
+  /**
+   * Lazy initialization of RealmAwareZkClient used throughout the REST server.
+   * @return
+   */
   public RealmAwareZkClient getRealmAwareZkClient() {
     if (_zkClient == null) {
       synchronized (this) {
         if (_zkClient == null) {
-          // If the multi ZK config is enabled, use FederatedZkClient on 
multi-realm mode
-          if (_isMultiZkEnabled || Boolean
-              
.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
-            try {
-              // Make sure the ServerContext is subscribed to routing data 
change so that it knows
-              // when to reset ZkClient and Helix APIs
-              if (_zkClientForListener == null) {
-                _zkClientForListener = DedicatedZkClientFactory.getInstance()
-                    .buildZkClient(new 
HelixZkClient.ZkConnectionConfig(_zkAddr),
-                        new HelixZkClient.ZkClientConfig()
-                            .setZkSerializer(new ZNRecordSerializer()));
-              }
-              // Refresh data subscription
-              _zkClientForListener.unsubscribeAll();
-              _zkClientForListener.subscribeRoutingDataChanges(this, this);
-              LOG.info("ServerContext: subscribed to routing data in routing 
ZK at {}!", _zkAddr);
-
-              RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder 
connectionConfigBuilder =
-                  new 
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
-              // If MSDS endpoint is set for this namespace, use that instead.
-              if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
-                
connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint)
-                    
.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name());
-              }
-              _zkClient = new 
FederatedZkClient(connectionConfigBuilder.build(),
-                  new RealmAwareZkClient.RealmAwareZkClientConfig()
-                      .setZkSerializer(new ZNRecordSerializer()));
-              LOG.info("ServerContext: FederatedZkClient created 
successfully!");
-            } catch (InvalidRoutingDataException | IllegalStateException e) {
-              throw new HelixException("Failed to create FederatedZkClient!", 
e);
-            }
-          } else {
-            // If multi ZK config is not set, just connect to the ZK address 
given
-            HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
-            clientConfig.setZkSerializer(new ZNRecordSerializer());
-            _zkClient = SharedZkClientFactory.getInstance()
-                .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), 
clientConfig);
-          }
+          _zkClient = createRealmAwareZkClient(_zkClient, true, new 
ZNRecordSerializer());
         }
       }
     }
     return _zkClient;
   }
 
+  /**
+   * Returns a RealmAWareZkClient with ByteArraySerializer with double-checked 
locking.
+   * NOTE: this is different from getRealmAwareZkClient in that it does not 
reset listeners for
+   * _zkClientForListener because this RealmAwareZkClient is independent from 
routing data changes.
+   * @return
+   */
+  public RealmAwareZkClient getByteArrayRealmAwareZkClient() {
+    if (_byteArrayZkClient == null) {
+      synchronized (this) {
+        if (_byteArrayZkClient == null) {
+          _byteArrayZkClient =
+              createRealmAwareZkClient(_byteArrayZkClient, false, new 
ByteArraySerializer());
+        }
+      }
+    }
+    return _byteArrayZkClient;
+  }
+
+  /**
+   * Main creation logic for RealmAwareZkClient.
+   * @param realmAwareZkClient
+   * @param shouldSubscribeToRoutingDataChange if true, it will initialize zk 
client to listen on
+   *                                           routing data change and refresh 
change subscription
+   * @param zkSerializer the type of ZkSerializer to use
+   * @return
+   */
+  private RealmAwareZkClient createRealmAwareZkClient(RealmAwareZkClient 
realmAwareZkClient,
+      boolean shouldSubscribeToRoutingDataChange, ZkSerializer zkSerializer) {
+    // If the multi ZK config is enabled, use FederatedZkClient on multi-realm 
mode
+    if (_isMultiZkEnabled || Boolean
+        
.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+      try {
+        if (shouldSubscribeToRoutingDataChange) {
+          initializeZkClientForRoutingData();
+        }
+        RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder 
connectionConfigBuilder =
+            new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+        // If MSDS endpoint is set for this namespace, use that instead.
+        if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
+          connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint)
+              .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name());
+        }
+        realmAwareZkClient = new 
FederatedZkClient(connectionConfigBuilder.build(),
+            new 
RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(zkSerializer));
+        LOG.info("ServerContext: FederatedZkClient created successfully!");
+      } catch (InvalidRoutingDataException | IllegalStateException e) {
+        throw new HelixException("Failed to create FederatedZkClient!", e);
+      }
+    } else {
+      // If multi ZK config is not set, just connect to the ZK address given
+      HelixZkClient.ZkClientConfig clientConfig = new 
HelixZkClient.ZkClientConfig();
+      clientConfig.setZkSerializer(zkSerializer);
+      realmAwareZkClient = SharedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr), 
clientConfig);
+    }
+    return realmAwareZkClient;
+  }
+
+  /**
+   * Initialization logic for ZkClient for routing data listener.
+   * NOTE: The initialization lifecycle of zkClientForRoutingDataListener is 
tied to the private
+   * volatile zkClient.
+   */
+  private void initializeZkClientForRoutingData() {
+    // Make sure the ServerContext is subscribed to routing data change so 
that it knows
+    // when to reset ZkClient and Helix APIs
+    if (_zkClientForRoutingDataListener == null) {
+      // Routing data is always in the ZNRecord format
+      _zkClientForRoutingDataListener = DedicatedZkClientFactory.getInstance()
+          .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
+              new HelixZkClient.ZkClientConfig().setZkSerializer(new 
ZNRecordSerializer()));
+    }
+    // Refresh data subscription
+    _zkClientForRoutingDataListener.unsubscribeAll();
+    _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
+    LOG.info("ServerContext: subscribed to routing data in routing ZK at {}!", 
_zkAddr);
+  }
+
   @Deprecated
   public ZkClient getZkClient() {
     return (ZkClient) getRealmAwareZkClient();
@@ -232,8 +279,7 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
     if (_byteArrayZkBaseDataAccessor == null) {
       synchronized (this) {
         if (_byteArrayZkBaseDataAccessor == null) {
-          _byteArrayZkBaseDataAccessor =
-              new ZkBaseDataAccessor<>(_zkAddr, new ByteArraySerializer());
+          _byteArrayZkBaseDataAccessor = new 
ZkBaseDataAccessor<>(getByteArrayRealmAwareZkClient());
         }
       }
     }
@@ -247,25 +293,25 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
     if (_zkMetadataStoreDirectory != null) {
       _zkMetadataStoreDirectory.close();
     }
-    if (_zkClientForListener != null) {
-      _zkClientForListener.close();
+    if (_zkClientForRoutingDataListener != null) {
+      _zkClientForRoutingDataListener.close();
     }
   }
 
   @Override
   public void handleChildChange(String parentPath, List<String> currentChilds) 
{
-    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+    if (_zkClientForRoutingDataListener == null || 
_zkClientForRoutingDataListener.isClosed()) {
       return;
     }
     // Resubscribe
-    _zkClientForListener.unsubscribeAll();
-    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    _zkClientForRoutingDataListener.unsubscribeAll();
+    _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
     resetZkResources();
   }
 
   @Override
   public void handleDataChange(String dataPath, Object data) {
-    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+    if (_zkClientForRoutingDataListener == null || 
_zkClientForRoutingDataListener.isClosed()) {
       return;
     }
     resetZkResources();
@@ -273,45 +319,45 @@ public class ServerContext implements IZkDataListener, 
IZkChildListener, IZkStat
 
   @Override
   public void handleDataDeleted(String dataPath) {
-    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+    if (_zkClientForRoutingDataListener == null || 
_zkClientForRoutingDataListener.isClosed()) {
       return;
     }
     // Resubscribe
-    _zkClientForListener.unsubscribeAll();
-    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    _zkClientForRoutingDataListener.unsubscribeAll();
+    _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
     resetZkResources();
   }
 
   @Override
   public void handleStateChanged(Watcher.Event.KeeperState state) {
-    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+    if (_zkClientForRoutingDataListener == null || 
_zkClientForRoutingDataListener.isClosed()) {
       return;
     }
     // Resubscribe
-    _zkClientForListener.unsubscribeAll();
-    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    _zkClientForRoutingDataListener.unsubscribeAll();
+    _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
     resetZkResources();
   }
 
   @Override
   public void handleNewSession(String sessionId) {
-    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+    if (_zkClientForRoutingDataListener == null || 
_zkClientForRoutingDataListener.isClosed()) {
       return;
     }
     // Resubscribe
-    _zkClientForListener.unsubscribeAll();
-    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    _zkClientForRoutingDataListener.unsubscribeAll();
+    _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
     resetZkResources();
   }
 
   @Override
   public void handleSessionEstablishmentError(Throwable error) {
-    if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+    if (_zkClientForRoutingDataListener == null || 
_zkClientForRoutingDataListener.isClosed()) {
       return;
     }
     // Resubscribe
-    _zkClientForListener.unsubscribeAll();
-    _zkClientForListener.subscribeRoutingDataChanges(this, this);
+    _zkClientForRoutingDataListener.unsubscribeAll();
+    _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
     resetZkResources();
   }
 

Reply via email to