This is an automated email from the ASF dual-hosted git repository.
somandal pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ecb1783d1b Fix needReload to fetch status from servers in ExternalView
(#15637)
ecb1783d1b is described below
commit ecb1783d1ba334433dda46dbc3772750a1293423
Author: Sonam Mandal <[email protected]>
AuthorDate: Thu Apr 24 16:11:01 2025 -0700
Fix needReload to fetch status from servers in ExternalView (#15637)
---
.../core/rebalance/DefaultRebalancePreChecker.java | 26 +++-----------
.../pinot/controller/util/TableMetadataReader.java | 42 ++++++++++++++--------
2 files changed, 32 insertions(+), 36 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index 7ef7d53c00..2c8812d799 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -83,8 +83,7 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
Map<String, RebalancePreCheckerResult> preCheckResult = new HashMap<>();
// Check for reload status
- preCheckResult.put(NEEDS_RELOAD_STATUS,
- checkReloadNeededOnServers(tableNameWithType,
preCheckContext.getCurrentAssignment(), tableRebalanceLogger));
+ preCheckResult.put(NEEDS_RELOAD_STATUS,
checkReloadNeededOnServers(tableNameWithType, tableRebalanceLogger));
// Check whether minimizeDataMovement is set in TableConfig
preCheckResult.put(IS_MINIMIZE_DATA_MOVEMENT,
checkIsMinimizeDataMovement(tableConfig, rebalanceConfig,
tableRebalanceLogger));
@@ -114,8 +113,7 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
* TODO: Add an API to check for whether segments in deep store are up to
date with the table configs and schema
* and add a pre-check here to call that API.
*/
- private RebalancePreCheckerResult checkReloadNeededOnServers(String
tableNameWithType,
- Map<String, Map<String, String>> currentAssignment, Logger
tableRebalanceLogger) {
+ private RebalancePreCheckerResult checkReloadNeededOnServers(String
tableNameWithType, Logger tableRebalanceLogger) {
tableRebalanceLogger.info("Fetching whether reload is needed");
Boolean needsReload = null;
if (_executorService == null) {
@@ -125,18 +123,12 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
try (PoolingHttpClientConnectionManager connectionManager = new
PoolingHttpClientConnectionManager()) {
TableMetadataReader metadataReader = new
TableMetadataReader(_executorService, connectionManager,
_pinotHelixResourceManager);
- // Only send needReload request to servers that are part of the current
assignment. The tagged server list may
- // include new servers which are part of target assignment but not
current assignment. needReload throws an
- // exception for servers that don't contain segments for the given table
- Set<String> currentlyAssignedServers =
getCurrentlyAssignedServers(currentAssignment);
TableMetadataReader.TableReloadJsonResponse needsReloadMetadataPair =
-
metadataReader.getServerSetCheckSegmentsReloadMetadata(tableNameWithType,
30_000, currentlyAssignedServers);
+
metadataReader.getServerCheckSegmentsReloadMetadata(tableNameWithType, 30_000);
Map<String, JsonNode> needsReloadMetadata =
needsReloadMetadataPair.getServerReloadJsonResponses();
int failedResponses = needsReloadMetadataPair.getNumFailedResponses();
- tableRebalanceLogger.info(
- "Received {} needs reload responses and {} failed responses from
servers with "
- + "number of servers queried: {}", needsReloadMetadata.size(),
failedResponses,
- currentlyAssignedServers.size());
+ tableRebalanceLogger.info("Received {} needs reload responses and {}
failed responses from servers assigned "
+ + "to table", needsReloadMetadata.size(), failedResponses);
needsReload = needsReloadMetadata.values().stream().anyMatch(value ->
value.get("needReload").booleanValue());
if (!needsReload && failedResponses > 0) {
tableRebalanceLogger.warn(
@@ -269,14 +261,6 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
return RebalancePreCheckerResult.error("Got exception when fetching
instance assignment, check manually");
}
- private Set<String> getCurrentlyAssignedServers(Map<String, Map<String,
String>> currentAssignment) {
- Set<String> servers = new HashSet<>();
- for (Map<String, String> serverStateMap : currentAssignment.values()) {
- servers.addAll(serverStateMap.keySet());
- }
- return servers;
- }
-
private RebalancePreCheckerResult checkDiskUtilization(Map<String,
Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment,
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, double
threshold, boolean worstCase) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
index e6318cb05b..26958034f2 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/util/TableMetadataReader.java
@@ -30,12 +30,14 @@ import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.hc.client5.http.io.HttpClientConnectionManager;
+import org.apache.helix.model.ExternalView;
import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.restlet.resources.TableMetadataInfo;
import org.apache.pinot.common.restlet.resources.ValidDocIdsMetadataInfo;
import org.apache.pinot.controller.api.resources.TableStaleSegmentResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -60,7 +62,7 @@ public class TableMetadataReader {
}
/**
- * Check if segments need a reload on any servers
+ * Check if segments need a reload on any servers. Server list is obtained
from the ExternalView of the table
* @return response containing a) number of failed responses, b) reload
responses returned
*/
public TableReloadJsonResponse getServerCheckSegmentsReloadMetadata(String
tableNameWithType,
@@ -71,25 +73,35 @@ public class TableMetadataReader {
return processSegmentMetadataReloadResponse(segmentsMetadataResponse);
}
+ /**
+ * Only send needReload request to servers that are part of the
ExternalView. The tagged server list should not be
+ * used as it may be outdated and may not handle scenarios like tiered
storage and COMPLETED segments.
+ * needReload throws an exception for servers that don't contain segments
for the given table
+ */
public ServerSegmentMetadataReader.TableReloadResponse
getReloadCheckResponses(String tableNameWithType,
int timeoutMs) throws InvalidConfigException {
- TableType tableType =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType);
- List<String> serverInstances =
_pinotHelixResourceManager.getServerInstancesForTable(tableNameWithType,
tableType);
- Set<String> serverInstanceSet = new HashSet<>(serverInstances);
+ ExternalView externalView =
_pinotHelixResourceManager.getTableExternalView(tableNameWithType);
+ Set<String> serverInstanceSet = new HashSet<>();
+ if (externalView != null) {
+ serverInstanceSet =
getCurrentlyAssignedServersFromExternalView(externalView);
+ }
return getServerSetReloadCheckResponses(tableNameWithType, timeoutMs,
serverInstanceSet);
}
- /**
- * Check if segments need a reload on any servers based on a provided server
set (useful for rebalance where the
- * currently assigned servers may not match the currently tagged server list)
- * @return response containing a) number of failed responses, b) reload
responses returned
- */
- public TableReloadJsonResponse
getServerSetCheckSegmentsReloadMetadata(String tableNameWithType,
- int timeoutMs, Set<String> serverSet)
- throws InvalidConfigException, IOException {
- ServerSegmentMetadataReader.TableReloadResponse segmentsMetadataResponse =
getServerSetReloadCheckResponses(
- tableNameWithType, timeoutMs, serverSet);
- return processSegmentMetadataReloadResponse(segmentsMetadataResponse);
+ private Set<String> getCurrentlyAssignedServersFromExternalView(ExternalView
externalView) {
+ Map<String, Map<String, String>> assignment =
externalView.getRecord().getMapFields();
+ Set<String> servers = new HashSet<>();
+ for (Map<String, String> serverStateMap : assignment.values()) {
+ for (Map.Entry<String, String> entry : serverStateMap.entrySet()) {
+ String state = entry.getValue();
+ // Skip adding the server if the segment is in ERROR or OFFLINE state
+ if
(CommonConstants.Helix.StateModel.SegmentStateModel.ONLINE.equals(state)
+ ||
CommonConstants.Helix.StateModel.SegmentStateModel.CONSUMING.equals(state)) {
+ servers.add(entry.getKey());
+ }
+ }
+ }
+ return servers;
}
public ServerSegmentMetadataReader.TableReloadResponse
getServerSetReloadCheckResponses(String tableNameWithType,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]