This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch controller_api_patch
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit f3472898ad5e81f0de3267c4b2333b629fc9d007
Author: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local>
AuthorDate: Tue Sep 15 19:56:11 2020 +0530

    Fetch configs from cache rather than zookeeper
---
 .../helix/core/PinotHelixResourceManager.java      | 70 +++++++++++++++++-----
 1 file changed, 54 insertions(+), 16 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 1536128..917a777 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -25,7 +25,6 @@ import com.google.common.cache.CacheLoader;
 import com.google.common.cache.LoadingCache;
 import com.google.common.collect.BiMap;
 import com.google.common.collect.HashBiMap;
-import com.google.common.collect.Lists;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -57,6 +56,8 @@ import org.apache.helix.InstanceType;
 import org.apache.helix.PropertyKey;
 import org.apache.helix.PropertyKey.Builder;
 import org.apache.helix.ZNRecord;
+import org.apache.helix.api.listeners.ExternalViewChangeListener;
+import org.apache.helix.api.listeners.LiveInstanceChangeListener;
 import org.apache.helix.model.CurrentState;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.HelixConfigScope;
@@ -103,6 +104,9 @@ import 
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import 
org.apache.pinot.controller.helix.core.listener.ClusterExternalViewChangeListener;
+import 
org.apache.pinot.controller.helix.core.listener.ClusterInstanceConfigChangeListener;
+import 
org.apache.pinot.controller.helix.core.listener.ClusterLiveInstanceChangeListener;
 import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
@@ -160,6 +164,9 @@ public class PinotHelixResourceManager {
   private SegmentDeletionManager _segmentDeletionManager;
   private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager;
   private TableCache _tableCache;
+  private ClusterInstanceConfigChangeListener 
_clusterInstanceConfigChangeListener;
+  private ClusterLiveInstanceChangeListener _clusterLiveInstanceChangeListener;
+  private ClusterExternalViewChangeListener _clusterExternalViewChangeListener;
 
   public PinotHelixResourceManager(String zkURL, String helixClusterName, 
@Nullable String dataDir,
       long externalViewOnlineToOfflineTimeoutMillis, boolean 
isSingleTenantCluster, boolean enableBatchMessageMode,
@@ -223,6 +230,18 @@ public class PinotHelixResourceManager {
     boolean caseInsensitive = 
Boolean.parseBoolean(configs.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean
         
.parseBoolean(configs.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY));
     _tableCache = new TableCache(_propertyStore, caseInsensitive);
+
+    _clusterInstanceConfigChangeListener = new 
ClusterInstanceConfigChangeListener();
+    _clusterLiveInstanceChangeListener = new 
ClusterLiveInstanceChangeListener();
+    _clusterExternalViewChangeListener = new 
ClusterExternalViewChangeListener();
+    try {
+      addConfigListeners(_clusterInstanceConfigChangeListener);
+      addLiveInstanceListeners(_clusterLiveInstanceChangeListener);
+      addExternalViewListeners(_clusterExternalViewChangeListener);
+    } catch (Exception e) {
+      LOGGER.warn(
+          "Unable to add config listener in controller. This will result in 
incorrect response from controller's broker API");
+    }
   }
 
   /**
@@ -317,7 +336,7 @@ public class PinotHelixResourceManager {
    * Returns the config for all the Helix instances in the cluster.
    */
   public List<InstanceConfig> getAllHelixInstanceConfigs() {
-    return HelixHelper.getInstanceConfigs(_helixZkManager);
+    return _clusterInstanceConfigChangeListener.getInstanceConfigs();
   }
 
   /**
@@ -359,14 +378,15 @@ public class PinotHelixResourceManager {
         brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker();
       }
     }
-    return HelixHelper.getInstancesWithTag(_helixZkManager, 
TagNameUtils.getBrokerTagForTenant(brokerTenantName));
+    return 
HelixHelper.getInstancesWithTag(_clusterInstanceConfigChangeListener.getInstanceConfigs(),
+        TagNameUtils.getBrokerTagForTenant(brokerTenantName));
   }
 
   /**
    * Get all instances with the given tag
    */
   public List<String> getInstancesWithTag(String tag) {
-    return HelixHelper.getInstancesWithTag(_helixZkManager, tag);
+    return 
HelixHelper.getInstancesWithTag(_clusterInstanceConfigChangeListener.getInstanceConfigs(),
 tag);
   }
 
   /**
@@ -833,7 +853,7 @@ public class PinotHelixResourceManager {
 
   public Set<String> getAllBrokerTenantNames() {
     Set<String> tenantSet = new HashSet<>();
-    List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs();
+    List<InstanceConfig> instanceConfigs = 
_clusterInstanceConfigChangeListener.getInstanceConfigs();
     for (InstanceConfig instanceConfig : instanceConfigs) {
       for (String tag : instanceConfig.getTags()) {
         if (TagNameUtils.isBrokerTag(tag)) {
@@ -969,7 +989,7 @@ public class PinotHelixResourceManager {
   }
 
   public Set<String> getAllInstancesForServerTenant(String tenantName) {
-    return 
getAllInstancesForServerTenant(HelixHelper.getInstanceConfigs(_helixZkManager), 
tenantName);
+    return 
getAllInstancesForServerTenant(_clusterInstanceConfigChangeListener.getInstanceConfigs(),
 tenantName);
   }
 
   /**
@@ -981,7 +1001,7 @@ public class PinotHelixResourceManager {
   }
 
   public Set<String> getAllInstancesForBrokerTenant(String tenantName) {
-    return 
getAllInstancesForBrokerTenant(HelixHelper.getInstanceConfigs(_helixZkManager), 
tenantName);
+    return 
getAllInstancesForBrokerTenant(_clusterInstanceConfigChangeListener.getInstanceConfigs(),
 tenantName);
   }
 
   /**
@@ -1798,7 +1818,8 @@ public class PinotHelixResourceManager {
         _helixZkManager.getMessagingService().send(recipientCriteria, 
routingTableRebuildMessage, null, -1);
     if (numMessagesSent > 0) {
       // TODO: Would be nice if we can get the name of the instances to which 
messages were sent
-      LOGGER.info("Sent {} routing table rebuild messages to brokers for 
table: {}", numMessagesSent, tableNameWithType);
+      LOGGER
+          .info("Sent {} routing table rebuild messages to brokers for table: 
{}", numMessagesSent, tableNameWithType);
     } else {
       LOGGER.warn("No routing table rebuild message sent to brokers for table: 
{}", tableNameWithType);
     }
@@ -2010,8 +2031,8 @@ public class PinotHelixResourceManager {
   public List<String> getBrokerInstancesForTable(String tableName, TableType 
tableType) {
     TableConfig tableConfig = getTableConfig(tableName, tableType);
     Preconditions.checkNotNull(tableConfig);
-    return HelixHelper
-        .getInstancesWithTag(_helixZkManager, 
TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()));
+    return 
HelixHelper.getInstancesWithTag(_clusterInstanceConfigChangeListener.getInstanceConfigs(),
+        TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig()));
   }
 
   public PinotResourceManagerResponse enableInstance(String instanceName) {
@@ -2163,7 +2184,7 @@ public class PinotHelixResourceManager {
    */
   public List<String> getOnlineUnTaggedBrokerInstanceList() {
     List<String> instanceList = 
HelixHelper.getInstancesWithTag(_helixZkManager, 
Helix.UNTAGGED_BROKER_INSTANCE);
-    List<String> liveInstances = 
_helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
+    List<String> liveInstances = getOnlineInstanceList();
     instanceList.retainAll(liveInstances);
     return instanceList;
   }
@@ -2174,13 +2195,29 @@ public class PinotHelixResourceManager {
    */
   public List<String> getOnlineUnTaggedServerInstanceList() {
     List<String> instanceList = 
HelixHelper.getInstancesWithTag(_helixZkManager, 
Helix.UNTAGGED_SERVER_INSTANCE);
-    List<String> liveInstances = 
_helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
+    List<String> liveInstances = getOnlineInstanceList();
     instanceList.retainAll(liveInstances);
     return instanceList;
   }
 
   public List<String> getOnlineInstanceList() {
-    return _helixDataAccessor.getChildNames(_keyBuilder.liveInstances());
+    return 
_clusterLiveInstanceChangeListener.getLiveInstances().stream().map(LiveInstance::getInstanceName)
+        .collect(Collectors.toList());
+  }
+
+  public void addConfigListeners(ClusterInstanceConfigChangeListener 
clusterInstanceConfigChangeListener)
+      throws Exception {
+    
_helixZkManager.addInstanceConfigChangeListener(clusterInstanceConfigChangeListener);
+  }
+
+  public void addLiveInstanceListeners(LiveInstanceChangeListener 
liveInstanceChangeListener)
+      throws Exception {
+    _helixZkManager.addLiveInstanceChangeListener(liveInstanceChangeListener);
+  }
+
+  public void addExternalViewListeners(ExternalViewChangeListener 
externalViewChangeListener)
+      throws Exception {
+    _helixZkManager.addExternalViewChangeListener(externalViewChangeListener);
   }
 
   /**
@@ -2437,9 +2474,9 @@ public class PinotHelixResourceManager {
       }
       Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS);
     } while (System.currentTimeMillis() < endTimeMs);
-    throw new TimeoutException(String.format(
-        "Time out while waiting segments become ONLINE. (tableNameWithType = 
%s, segmentsToCheck = %s)",
-        tableNameWithType, segmentsToCheck));
+    throw new TimeoutException(String
+        .format("Time out while waiting segments become ONLINE. 
(tableNameWithType = %s, segmentsToCheck = %s)",
+            tableNameWithType, segmentsToCheck));
   }
 
   private Set<String> getOnlineSegmentsFromExternalView(String 
tableNameWithType) {
@@ -2458,6 +2495,7 @@ public class PinotHelixResourceManager {
     return onlineSegments;
   }
 
+
   /*
    * Uncomment and use for testing on a real cluster
   public static void main(String[] args) throws Exception {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org
For additional commands, e-mail: commits-h...@pinot.apache.org

Reply via email to