richardstartin commented on code in PR #8491:
URL: https://github.com/apache/pinot/pull/8491#discussion_r847752986
##########
pinot-broker/src/main/java/org/apache/pinot/broker/routing/BrokerRoutingManager.java:
##########
@@ -214,87 +219,178 @@ private void processInstanceConfigChange() {
long startTimeMs = System.currentTimeMillis();
List<ZNRecord> instanceConfigZNRecords =
- _zkDataAccessor.getChildren(_instanceConfigsPath, null,
AccessOption.PERSISTENT,
- CommonConstants.Helix.ZkClient.RETRY_COUNT,
CommonConstants.Helix.ZkClient.RETRY_INTERVAL_MS);
+ _zkDataAccessor.getChildren(_instanceConfigsPath, null,
AccessOption.PERSISTENT, Helix.ZkClient.RETRY_COUNT,
+ Helix.ZkClient.RETRY_INTERVAL_MS);
long fetchInstanceConfigsEndTimeMs = System.currentTimeMillis();
- // Calculate new enabled and disabled instances
- Set<String> enabledInstances = new HashSet<>();
- List<String> newEnabledInstances = new ArrayList<>();
+ // Calculate new enabled and disabled servers
+ Set<String> enabledServers = new HashSet<>();
+ List<String> newEnabledServers = new ArrayList<>();
for (ZNRecord instanceConfigZNRecord : instanceConfigZNRecords) {
- String instance = instanceConfigZNRecord.getId();
- if (isInstanceEnabled(instanceConfigZNRecord)) {
- enabledInstances.add(instance);
+ String instanceId = instanceConfigZNRecord.getId();
+ if (isEnabledServer(instanceConfigZNRecord)) {
+ enabledServers.add(instanceId);
// Always refresh the server instance with the latest instance config
in case it changes
ServerInstance serverInstance = new ServerInstance(new
InstanceConfig(instanceConfigZNRecord));
- if (_enabledServerInstanceMap.put(instance, serverInstance) == null) {
- newEnabledInstances.add(instance);
+ if (_enabledServerInstanceMap.put(instanceId, serverInstance) == null)
{
+ newEnabledServers.add(instanceId);
+
+ // NOTE: Remove new enabled server from excluded servers because the
server is likely being restarted
+ if (_excludedServers.remove(instanceId)) {
+ LOGGER.info("Got excluded server: {} re-enabled, including it into
the routing", instanceId);
+ }
}
}
}
- List<String> newDisabledInstances = new ArrayList<>();
+ List<String> newDisabledServers = new ArrayList<>();
for (String instance : _enabledServerInstanceMap.keySet()) {
- if (!enabledInstances.contains(instance)) {
- newDisabledInstances.add(instance);
+ if (!enabledServers.contains(instance)) {
+ newDisabledServers.add(instance);
}
}
- List<String> changedInstances = new ArrayList<>(newEnabledInstances.size()
+ newDisabledInstances.size());
- changedInstances.addAll(newEnabledInstances);
- changedInstances.addAll(newDisabledInstances);
- long calculateChangedInstancesEndTimeMs = System.currentTimeMillis();
-
- // Early terminate if there is no instance changed
- if (changedInstances.isEmpty()) {
- LOGGER.info(
- "Processed instance config change in {}ms (fetch {} instance
configs: {}ms, calculate changed instances: "
- + "{}ms) without instance change",
calculateChangedInstancesEndTimeMs - startTimeMs,
- instanceConfigZNRecords.size(), fetchInstanceConfigsEndTimeMs -
startTimeMs,
- calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs);
+
+ // Calculate the routable servers and the changed routable servers
+ List<String> changedServers = new ArrayList<>(newEnabledServers.size() +
newDisabledServers.size());
+ if (_excludedServers.isEmpty()) {
+ _routableServers = enabledServers;
+ changedServers.addAll(newEnabledServers);
+ changedServers.addAll(newDisabledServers);
+ } else {
+ enabledServers.removeAll(_excludedServers);
+ _routableServers = enabledServers;
+ // NOTE: All new enabled servers are routable
+ changedServers.addAll(newEnabledServers);
+ for (String newDisabledServer : newDisabledServers) {
+ if (_excludedServers.contains(newDisabledServer)) {
+ changedServers.add(newDisabledServer);
+ }
+ }
+ }
+ long calculateChangedServersEndTimeMs = System.currentTimeMillis();
+
+ // Early terminate if there is no changed servers
+ if (changedServers.isEmpty()) {
+ LOGGER.info("Processed instance config change in {}ms "
+ + "(fetch {} instance configs: {}ms, calculate changed servers:
{}ms) without instance change",
+ calculateChangedServersEndTimeMs - startTimeMs,
instanceConfigZNRecords.size(),
+ fetchInstanceConfigsEndTimeMs - startTimeMs,
+ calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs);
return;
}
// Update routing entry for all tables
for (RoutingEntry routingEntry : _routingEntryMap.values()) {
try {
- routingEntry.onInstancesChange(enabledInstances, changedInstances);
+ routingEntry.onInstancesChange(_routableServers, changedServers);
} catch (Exception e) {
LOGGER.error("Caught unexpected exception while updating routing entry
on instances change for table: {}",
routingEntry.getTableNameWithType(), e);
}
}
long updateRoutingEntriesEndTimeMs = System.currentTimeMillis();
- // Remove new disabled instances from _enabledServerInstanceMap after
updating all routing entries to ensure it
- // always contains the selected instances
- for (String newDisabledInstance : newDisabledInstances) {
+ // Remove new disabled servers from _enabledServerInstanceMap after
updating all routing entries to ensure it
+ // always contains the selected servers
+ for (String newDisabledInstance : newDisabledServers) {
_enabledServerInstanceMap.remove(newDisabledInstance);
}
- LOGGER.info(
- "Processed instance config change in {}ms (fetch {} instance configs:
{}ms, calculate changed instances: "
- + "{}ms, update {} routing entries: {}ms), new enabled instances:
{}, new disabled instances: {}",
+ LOGGER.info("Processed instance config change in {}ms "
+ + "(fetch {} instance configs: {}ms, calculate changed servers:
{}ms, update {} routing entries: {}ms), "
+ + "new enabled servers: {}, new disabled servers: {}, excluded
servers: {}",
updateRoutingEntriesEndTimeMs - startTimeMs,
instanceConfigZNRecords.size(),
- fetchInstanceConfigsEndTimeMs - startTimeMs,
calculateChangedInstancesEndTimeMs - fetchInstanceConfigsEndTimeMs,
- _routingEntryMap.size(), updateRoutingEntriesEndTimeMs -
calculateChangedInstancesEndTimeMs,
- newEnabledInstances, newDisabledInstances);
+ fetchInstanceConfigsEndTimeMs - startTimeMs,
calculateChangedServersEndTimeMs - fetchInstanceConfigsEndTimeMs,
+ _routingEntryMap.size(), updateRoutingEntriesEndTimeMs -
calculateChangedServersEndTimeMs, newEnabledServers,
+ newDisabledServers, _excludedServers);
}
- private static boolean isInstanceEnabled(ZNRecord instanceConfigZNRecord) {
+ private static boolean isEnabledServer(ZNRecord instanceConfigZNRecord) {
+ String instanceId = instanceConfigZNRecord.getId();
+ if (!instanceId.startsWith(Helix.PREFIX_OF_SERVER_INSTANCE)) {
+ // NOTE: Some legacy configs might not contain the instance type prefix
+ if (instanceId.startsWith(Helix.PREFIX_OF_CONTROLLER_INSTANCE) ||
instanceId.startsWith(
+ Helix.PREFIX_OF_BROKER_INSTANCE) ||
instanceId.startsWith(Helix.PREFIX_OF_MINION_INSTANCE)) {
+ return false;
+ }
+ }
if ("false".equalsIgnoreCase(
instanceConfigZNRecord.getSimpleField(InstanceConfig.InstanceConfigProperty.HELIX_ENABLED.name())))
{
return false;
}
- if
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.IS_SHUTDOWN_IN_PROGRESS)))
{
+ if
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.IS_SHUTDOWN_IN_PROGRESS)))
{
return false;
}
//noinspection RedundantIfStatement
- if
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(CommonConstants.Helix.QUERIES_DISABLED)))
{
+ if
("true".equalsIgnoreCase(instanceConfigZNRecord.getSimpleField(Helix.QUERIES_DISABLED)))
{
Review Comment:
Not introduced here but prefer Boolean.parseBoolean
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]