This is an automated email from the ASF dual-hosted git repository. kharekartik pushed a commit to branch controller_api_patch in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit f3472898ad5e81f0de3267c4b2333b629fc9d007 Author: Kartik Khare <kharekartik@Kartiks-MacBook-Pro.local> AuthorDate: Tue Sep 15 19:56:11 2020 +0530 Fetch configs from cache rather than zookeeper --- .../helix/core/PinotHelixResourceManager.java | 70 +++++++++++++++++----- 1 file changed, 54 insertions(+), 16 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 1536128..917a777 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -25,7 +25,6 @@ import com.google.common.cache.CacheLoader; import com.google.common.cache.LoadingCache; import com.google.common.collect.BiMap; import com.google.common.collect.HashBiMap; -import com.google.common.collect.Lists; import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; @@ -57,6 +56,8 @@ import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; import org.apache.helix.ZNRecord; +import org.apache.helix.api.listeners.ExternalViewChangeListener; +import org.apache.helix.api.listeners.LiveInstanceChangeListener; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; @@ -103,6 +104,9 @@ import org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssign import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory; import org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils; +import org.apache.pinot.controller.helix.core.listener.ClusterExternalViewChangeListener; +import org.apache.pinot.controller.helix.core.listener.ClusterInstanceConfigChangeListener; +import org.apache.pinot.controller.helix.core.listener.ClusterLiveInstanceChangeListener; import org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager; import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult; import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer; @@ -160,6 +164,9 @@ public class PinotHelixResourceManager { private SegmentDeletionManager _segmentDeletionManager; private PinotLLCRealtimeSegmentManager _pinotLLCRealtimeSegmentManager; private TableCache _tableCache; + private ClusterInstanceConfigChangeListener _clusterInstanceConfigChangeListener; + private ClusterLiveInstanceChangeListener _clusterLiveInstanceChangeListener; + private ClusterExternalViewChangeListener _clusterExternalViewChangeListener; public PinotHelixResourceManager(String zkURL, String helixClusterName, @Nullable String dataDir, long externalViewOnlineToOfflineTimeoutMillis, boolean isSingleTenantCluster, boolean enableBatchMessageMode, @@ -223,6 +230,18 @@ public class PinotHelixResourceManager { boolean caseInsensitive = Boolean.parseBoolean(configs.get(Helix.ENABLE_CASE_INSENSITIVE_KEY)) || Boolean .parseBoolean(configs.get(Helix.DEPRECATED_ENABLE_CASE_INSENSITIVE_KEY)); _tableCache = new TableCache(_propertyStore, caseInsensitive); + + _clusterInstanceConfigChangeListener = new ClusterInstanceConfigChangeListener(); + _clusterLiveInstanceChangeListener = new ClusterLiveInstanceChangeListener(); + _clusterExternalViewChangeListener = new ClusterExternalViewChangeListener(); + try { + addConfigListeners(_clusterInstanceConfigChangeListener); + addLiveInstanceListeners(_clusterLiveInstanceChangeListener); + addExternalViewListeners(_clusterExternalViewChangeListener); + } catch (Exception e) { + LOGGER.warn( + "Unable to add config listener in controller. This will result in incorrect response from controller's broker API"); + } } /** @@ -317,7 +336,7 @@ public class PinotHelixResourceManager { * Returns the config for all the Helix instances in the cluster. */ public List<InstanceConfig> getAllHelixInstanceConfigs() { - return HelixHelper.getInstanceConfigs(_helixZkManager); + return _clusterInstanceConfigChangeListener.getInstanceConfigs(); } /** @@ -359,14 +378,15 @@ public class PinotHelixResourceManager { brokerTenantName = realtimeTableConfig.getTenantConfig().getBroker(); } } - return HelixHelper.getInstancesWithTag(_helixZkManager, TagNameUtils.getBrokerTagForTenant(brokerTenantName)); + return HelixHelper.getInstancesWithTag(_clusterInstanceConfigChangeListener.getInstanceConfigs(), + TagNameUtils.getBrokerTagForTenant(brokerTenantName)); } /** * Get all instances with the given tag */ public List<String> getInstancesWithTag(String tag) { - return HelixHelper.getInstancesWithTag(_helixZkManager, tag); + return HelixHelper.getInstancesWithTag(_clusterInstanceConfigChangeListener.getInstanceConfigs(), tag); } /** @@ -833,7 +853,7 @@ public class PinotHelixResourceManager { public Set<String> getAllBrokerTenantNames() { Set<String> tenantSet = new HashSet<>(); - List<InstanceConfig> instanceConfigs = getAllHelixInstanceConfigs(); + List<InstanceConfig> instanceConfigs = _clusterInstanceConfigChangeListener.getInstanceConfigs(); for (InstanceConfig instanceConfig : instanceConfigs) { for (String tag : instanceConfig.getTags()) { if (TagNameUtils.isBrokerTag(tag)) { @@ -969,7 +989,7 @@ public class PinotHelixResourceManager { } public Set<String> getAllInstancesForServerTenant(String tenantName) { - return getAllInstancesForServerTenant(HelixHelper.getInstanceConfigs(_helixZkManager), tenantName); + return getAllInstancesForServerTenant(_clusterInstanceConfigChangeListener.getInstanceConfigs(), tenantName); } /** @@ -981,7 +1001,7 @@ public class PinotHelixResourceManager { } public Set<String> getAllInstancesForBrokerTenant(String tenantName) { - return getAllInstancesForBrokerTenant(HelixHelper.getInstanceConfigs(_helixZkManager), tenantName); + return getAllInstancesForBrokerTenant(_clusterInstanceConfigChangeListener.getInstanceConfigs(), tenantName); } /** @@ -1798,7 +1818,8 @@ public class PinotHelixResourceManager { _helixZkManager.getMessagingService().send(recipientCriteria, routingTableRebuildMessage, null, -1); if (numMessagesSent > 0) { // TODO: Would be nice if we can get the name of the instances to which messages were sent - LOGGER.info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType); + LOGGER + .info("Sent {} routing table rebuild messages to brokers for table: {}", numMessagesSent, tableNameWithType); } else { LOGGER.warn("No routing table rebuild message sent to brokers for table: {}", tableNameWithType); } @@ -2010,8 +2031,8 @@ public class PinotHelixResourceManager { public List<String> getBrokerInstancesForTable(String tableName, TableType tableType) { TableConfig tableConfig = getTableConfig(tableName, tableType); Preconditions.checkNotNull(tableConfig); - return HelixHelper - .getInstancesWithTag(_helixZkManager, TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig())); + return HelixHelper.getInstancesWithTag(_clusterInstanceConfigChangeListener.getInstanceConfigs(), + TagNameUtils.extractBrokerTag(tableConfig.getTenantConfig())); } public PinotResourceManagerResponse enableInstance(String instanceName) { @@ -2163,7 +2184,7 @@ public class PinotHelixResourceManager { */ public List<String> getOnlineUnTaggedBrokerInstanceList() { List<String> instanceList = HelixHelper.getInstancesWithTag(_helixZkManager, Helix.UNTAGGED_BROKER_INSTANCE); - List<String> liveInstances = _helixDataAccessor.getChildNames(_keyBuilder.liveInstances()); + List<String> liveInstances = getOnlineInstanceList(); instanceList.retainAll(liveInstances); return instanceList; } @@ -2174,13 +2195,29 @@ public class PinotHelixResourceManager { */ public List<String> getOnlineUnTaggedServerInstanceList() { List<String> instanceList = HelixHelper.getInstancesWithTag(_helixZkManager, Helix.UNTAGGED_SERVER_INSTANCE); - List<String> liveInstances = _helixDataAccessor.getChildNames(_keyBuilder.liveInstances()); + List<String> liveInstances = getOnlineInstanceList(); instanceList.retainAll(liveInstances); return instanceList; } public List<String> getOnlineInstanceList() { - return _helixDataAccessor.getChildNames(_keyBuilder.liveInstances()); + return _clusterLiveInstanceChangeListener.getLiveInstances().stream().map(LiveInstance::getInstanceName) + .collect(Collectors.toList()); + } + + public void addConfigListeners(ClusterInstanceConfigChangeListener clusterInstanceConfigChangeListener) + throws Exception { + _helixZkManager.addInstanceConfigChangeListener(clusterInstanceConfigChangeListener); + } + + public void addLiveInstanceListeners(LiveInstanceChangeListener liveInstanceChangeListener) + throws Exception { + _helixZkManager.addLiveInstanceChangeListener(liveInstanceChangeListener); + } + + public void addExternalViewListeners(ExternalViewChangeListener externalViewChangeListener) + throws Exception { + _helixZkManager.addExternalViewChangeListener(externalViewChangeListener); } /** @@ -2437,9 +2474,9 @@ public class PinotHelixResourceManager { } Thread.sleep(EXTERNAL_VIEW_CHECK_INTERVAL_MS); } while (System.currentTimeMillis() < endTimeMs); - throw new TimeoutException(String.format( - "Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", - tableNameWithType, segmentsToCheck)); + throw new TimeoutException(String + .format("Time out while waiting segments become ONLINE. (tableNameWithType = %s, segmentsToCheck = %s)", + tableNameWithType, segmentsToCheck)); } private Set<String> getOnlineSegmentsFromExternalView(String tableNameWithType) { @@ -2458,6 +2495,7 @@ public class PinotHelixResourceManager { return onlineSegments; } + /* * Uncomment and use for testing on a real cluster public static void main(String[] args) throws Exception { --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@pinot.apache.org For additional commands, e-mail: commits-h...@pinot.apache.org