This is an automated email from the ASF dual-hosted git repository.
xbli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 31e94d3b18 Add a dry-run summary mode for TableRebalance which only
returns a summary of the dry-run results (#15050)
31e94d3b18 is described below
commit 31e94d3b180a9a0aeda033040020a8beebacf9e6
Author: Sonam Mandal <[email protected]>
AuthorDate: Wed Feb 19 13:37:12 2025 -0800
Add a dry-run summary mode for TableRebalance which only returns a summary
of the dry-run results (#15050)
* Add a dry-run summary mode for TableRebalance which only returns a
summary of the dry-run results
* Address review - add server level lists of added, removed, unchanged, and
servers getting new segments
---
.../pinot/controller/BaseControllerStarter.java | 1 +
.../api/resources/PinotTableRestletResource.java | 10 +-
.../helix/core/PinotHelixResourceManager.java | 18 +-
.../core/rebalance/DefaultRebalancePreChecker.java | 3 +-
.../helix/core/rebalance/RebalanceConfig.java | 22 +-
.../helix/core/rebalance/RebalanceResult.java | 11 +-
.../core/rebalance/RebalanceSummaryResult.java | 325 ++++++++++++++++++++
.../helix/core/rebalance/TableRebalancer.java | 228 ++++++++++++--
.../rebalance/tenant/DefaultTenantRebalancer.java | 5 +-
.../rebalance/tenant/TenantRebalanceResult.java | 2 +-
.../TableRebalancerClusterStatelessTest.java | 342 ++++++++++++++++++++-
.../tests/OfflineClusterIntegrationTest.java | 208 ++++++++++++-
12 files changed, 1126 insertions(+), 49 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
index 5761cb27e5..9a4d080c7c 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/BaseControllerStarter.java
@@ -512,6 +512,7 @@ public abstract class BaseControllerStarter implements
ServiceStartable {
_tableSizeReader =
new TableSizeReader(_executorService, _connectionManager,
_controllerMetrics, _helixResourceManager,
_leadControllerManager);
+ _helixResourceManager.registerTableSizeReader(_tableSizeReader);
_storageQuotaChecker = new StorageQuotaChecker(_tableSizeReader,
_controllerMetrics, _leadControllerManager,
_helixResourceManager, _config);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 8930c2f643..8815d20d06 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -606,6 +606,9 @@ public class PinotTableRestletResource {
@ApiParam(value = "OFFLINE|REALTIME", required = true)
@QueryParam("type") String tableTypeStr,
@ApiParam(value = "Whether to rebalance table in dry-run mode")
@DefaultValue("false") @QueryParam("dryRun")
boolean dryRun,
+ @ApiParam(value = "Whether to return dry-run summary instead of full,
dry-run must be enabled to use this")
+ @DefaultValue("false") @QueryParam("summary")
+ boolean summary,
@ApiParam(value = "Whether to enable pre-checks for table, must be in
dry-run mode to enable")
@DefaultValue("false") @QueryParam("preChecks") boolean preChecks,
@ApiParam(value = "Whether to reassign instances before reassigning
segments") @DefaultValue("false")
@@ -648,6 +651,7 @@ public class PinotTableRestletResource {
String tableNameWithType = constructTableNameWithType(tableName,
tableTypeStr);
RebalanceConfig rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(dryRun);
+ rebalanceConfig.setSummary(summary);
rebalanceConfig.setPreChecks(preChecks);
rebalanceConfig.setReassignInstances(reassignInstances);
rebalanceConfig.setIncludeConsuming(includeConsuming);
@@ -668,7 +672,7 @@ public class PinotTableRestletResource {
String rebalanceJobId =
TableRebalancer.createUniqueRebalanceJobIdentifier();
try {
- if (dryRun || preChecks || downtime) {
+ if (dryRun || summary || preChecks || downtime) {
// For dry-run, preChecks or rebalance with downtime, directly return
the rebalance result as it should return
// immediately
return _pinotHelixResourceManager.rebalanceTable(tableNameWithType,
rebalanceConfig, rebalanceJobId, false);
@@ -689,7 +693,7 @@ public class PinotTableRestletResource {
String errorMsg = String.format("Caught exception/error while
rebalancing table: %s", tableNameWithType);
LOGGER.error(errorMsg, t);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null,
- null);
+ null, null);
}
});
boolean isJobIdPersisted = waitForRebalanceToPersist(
@@ -710,7 +714,7 @@ public class PinotTableRestletResource {
return new RebalanceResult(dryRunResult.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller logs for updates",
dryRunResult.getInstanceAssignment(),
dryRunResult.getTierInstanceAssignment(),
dryRunResult.getSegmentAssignment(),
- dryRunResult.getPreChecksResult());
+ dryRunResult.getPreChecksResult(),
dryRunResult.getRebalanceSummaryResult());
} else {
// If dry-run failed or is no-op, return the dry-run result
return dryRunResult;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 2c31458fcd..fd36bf44ea 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -159,6 +159,7 @@ import
org.apache.pinot.controller.helix.core.rebalance.TableRebalanceContext;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import
org.apache.pinot.controller.helix.core.rebalance.ZkBasedTableRebalanceObserver;
import org.apache.pinot.controller.helix.starter.HelixConfig;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.segment.spi.SegmentMetadata;
import org.apache.pinot.spi.config.DatabaseConfig;
import org.apache.pinot.spi.config.instance.Instance;
@@ -241,6 +242,7 @@ public class PinotHelixResourceManager {
private TableCache _tableCache;
private final LineageManager _lineageManager;
private final RebalancePreChecker _rebalancePreChecker;
+ private TableSizeReader _tableSizeReader;
public PinotHelixResourceManager(String zkURL, String helixClusterName,
@Nullable String dataDir,
boolean isSingleTenantCluster, boolean enableBatchMessageMode, int
deletedSegmentsRetentionInDays,
@@ -449,6 +451,15 @@ public class PinotHelixResourceManager {
}
/**
+ * Get the table size reader.
+ *
+ * @return table size reader
+ */
+ public TableSizeReader getTableSizeReader() {
+ return _tableSizeReader;
+ }
+
+/**
* Instance related APIs
*/
@@ -1943,6 +1954,10 @@ public class PinotHelixResourceManager {
_pinotLLCRealtimeSegmentManager = pinotLLCRealtimeSegmentManager;
}
+ public void registerTableSizeReader(TableSizeReader tableSizeReader) {
+ _tableSizeReader = tableSizeReader;
+ }
+
private void assignInstances(TableConfig tableConfig, boolean override) {
String tableNameWithType = tableConfig.getTableName();
String rawTableName =
TableNameBuilder.extractRawTableName(tableNameWithType);
@@ -3612,7 +3627,8 @@ public class PinotHelixResourceManager {
tierToSegmentsMap = updateTargetTier(rebalanceJobId, tableNameWithType,
tableConfig);
}
TableRebalancer tableRebalancer =
- new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics, _rebalancePreChecker);
+ new TableRebalancer(_helixZkManager, zkBasedTableRebalanceObserver,
_controllerMetrics, _rebalancePreChecker,
+ _tableSizeReader);
return tableRebalancer.rebalance(tableConfig, rebalanceConfig,
rebalanceJobId, tierToSegmentsMap);
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
index 945a98e48e..b6a89208ee 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/DefaultRebalancePreChecker.java
@@ -53,8 +53,7 @@ public class DefaultRebalancePreChecker implements
RebalancePreChecker {
}
@Override
- public Map<String, String> check(String rebalanceJobId, String
tableNameWithType,
- TableConfig tableConfig) {
+ public Map<String, String> check(String rebalanceJobId, String
tableNameWithType, TableConfig tableConfig) {
LOGGER.info("Start pre-checks for table: {} with rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
Map<String, String> preCheckResult = new HashMap<>();
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
index e5bd39f0ec..06a9e171c3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceConfig.java
@@ -34,6 +34,11 @@ public class RebalanceConfig {
@ApiModelProperty(example = "false")
private boolean _dryRun = false;
+ // Whether to return only dry-run summary instead of full dry-run output,
can only be used in dry-run mode
+ @JsonProperty("summary")
+ @ApiModelProperty(example = "false")
+ private boolean _summary = false;
+
// Whether to perform pre-checks for rebalance. This only returns the status
of each pre-check and does not fail
// rebalance
@JsonProperty("preChecks")
@@ -124,6 +129,14 @@ public class RebalanceConfig {
_dryRun = dryRun;
}
+ public boolean isSummary() {
+ return _summary;
+ }
+
+ public void setSummary(boolean summary) {
+ _summary = summary;
+ }
+
public boolean isPreChecks() {
return _preChecks;
}
@@ -246,10 +259,10 @@ public class RebalanceConfig {
@Override
public String toString() {
- return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", preChecks=" +
_preChecks + ", _reassignInstances="
- + _reassignInstances + ", _includeConsuming=" + _includeConsuming + ",
_bootstrap=" + _bootstrap
- + ", _downtime=" + _downtime + ", _minAvailableReplicas=" +
_minAvailableReplicas + ", _bestEfforts="
- + _bestEfforts + ", _externalViewCheckIntervalInMs=" +
_externalViewCheckIntervalInMs
+ return "RebalanceConfig{" + "_dryRun=" + _dryRun + ", _summary=" +
_summary + ", preChecks=" + _preChecks
+ + ", _reassignInstances=" + _reassignInstances + ",
_includeConsuming=" + _includeConsuming + ", _bootstrap="
+ + _bootstrap + ", _downtime=" + _downtime + ", _minAvailableReplicas="
+ _minAvailableReplicas
+ + ", _bestEfforts=" + _bestEfforts + ",
_externalViewCheckIntervalInMs=" + _externalViewCheckIntervalInMs
+ ", _externalViewStabilizationTimeoutInMs=" +
_externalViewStabilizationTimeoutInMs + ", _updateTargetTier="
+ _updateTargetTier + ", _heartbeatIntervalInMs=" +
_heartbeatIntervalInMs + ", _heartbeatTimeoutInMs="
+ _heartbeatTimeoutInMs + ", _maxAttempts=" + _maxAttempts + ",
_retryInitialDelayInMs="
@@ -259,6 +272,7 @@ public class RebalanceConfig {
public static RebalanceConfig copy(RebalanceConfig cfg) {
RebalanceConfig rc = new RebalanceConfig();
rc._dryRun = cfg._dryRun;
+ rc._summary = cfg._summary;
rc._preChecks = cfg._preChecks;
rc._reassignInstances = cfg._reassignInstances;
rc._includeConsuming = cfg._includeConsuming;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
index 2b81d8d78b..ac768a8c46 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceResult.java
@@ -42,6 +42,8 @@ public class RebalanceResult {
private final String _description;
@JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<String, String> _preChecksResult;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final RebalanceSummaryResult _rebalanceSummaryResult;
@JsonCreator
public RebalanceResult(@JsonProperty(value = "jobId", required = true)
String jobId,
@@ -50,7 +52,8 @@ public class RebalanceResult {
@JsonProperty("instanceAssignment") @Nullable
Map<InstancePartitionsType, InstancePartitions> instanceAssignment,
@JsonProperty("tierInstanceAssignment") @Nullable Map<String,
InstancePartitions> tierInstanceAssignment,
@JsonProperty("segmentAssignment") @Nullable Map<String, Map<String,
String>> segmentAssignment,
- @JsonProperty("preChecksResult") @Nullable Map<String, String>
preChecksResult) {
+ @JsonProperty("preChecksResult") @Nullable Map<String, String>
preChecksResult,
+ @JsonProperty("rebalanceSummaryResult") @Nullable RebalanceSummaryResult
rebalanceSummaryResult) {
_jobId = jobId;
_status = status;
_description = description;
@@ -58,6 +61,7 @@ public class RebalanceResult {
_tierInstanceAssignment = tierInstanceAssignment;
_segmentAssignment = segmentAssignment;
_preChecksResult = preChecksResult;
+ _rebalanceSummaryResult = rebalanceSummaryResult;
}
@JsonProperty
@@ -95,6 +99,11 @@ public class RebalanceResult {
return _preChecksResult;
}
+ @JsonProperty
+ public RebalanceSummaryResult getRebalanceSummaryResult() {
+ return _rebalanceSummaryResult;
+ }
+
public enum Status {
// FAILED if the job has ended with known exceptions;
// ABORTED if the job is stopped by others but retry is still allowed;
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
new file mode 100644
index 0000000000..3169c9b367
--- /dev/null
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -0,0 +1,325 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.controller.helix.core.rebalance;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import javax.annotation.Nullable;
+
+
+/**
+ * Holds the summary data of the rebalance result
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class RebalanceSummaryResult {
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final ServerInfo _serverInfo;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final SegmentInfo _segmentInfo;
+
+ /**
+ * Constructor for RebalanceSummaryResult
+ * @param serverInfo server related summary information
+ * @param segmentInfo segment related summary information
+ */
+ @JsonCreator
+ public RebalanceSummaryResult(@JsonProperty("serverInfo") @Nullable
ServerInfo serverInfo,
+ @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo) {
+ _serverInfo = serverInfo;
+ _segmentInfo = segmentInfo;
+ }
+
+ @JsonProperty
+ public ServerInfo getServerInfo() {
+ return _serverInfo;
+ }
+
+ @JsonProperty
+ public SegmentInfo getSegmentInfo() {
+ return _segmentInfo;
+ }
+
+ public static class ServerSegmentChangeInfo {
+ private final ServerStatus _serverStatus;
+ private final int _totalSegmentsAfterRebalance;
+ private final int _totalSegmentsBeforeRebalance;
+ private final int _segmentsAdded;
+ private final int _segmentsDeleted;
+ private final int _segmentsUnchanged;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final List<String> _tagList;
+
+ /**
+ * Constructor for ServerSegmentChangeInfo
+ * @param serverStatus server status, whether it was added, removed, or
unchanged as part of this rebalance
+ * @param totalSegmentsAfterRebalance expected total segments on this
server after rebalance
+ * @param totalSegmentsBeforeRebalance current number of segments on this
server before rebalance
+ * @param segmentsAdded number of segments expected to be added as part of
this rebalance
+ * @param segmentsDeleted number of segments expected to be deleted as
part of this rebalance
+ * @param segmentsUnchanged number of segments that aren't moving from
this server as part of this rebalance
+ * @param tagList server tag list
+ */
+ @JsonCreator
+ public ServerSegmentChangeInfo(@JsonProperty("serverStatus") ServerStatus
serverStatus,
+ @JsonProperty("totalSegmentsAfterRebalance") int
totalSegmentsAfterRebalance,
+ @JsonProperty("totalSegmentsBeforeRebalance") int
totalSegmentsBeforeRebalance,
+ @JsonProperty("segmentsAdded") int segmentsAdded,
@JsonProperty("segmentsDeleted") int segmentsDeleted,
+ @JsonProperty("segmentsUnchanged") int segmentsUnchanged,
+ @JsonProperty("tagList") @Nullable List<String> tagList) {
+ _serverStatus = serverStatus;
+ _totalSegmentsAfterRebalance = totalSegmentsAfterRebalance;
+ _totalSegmentsBeforeRebalance = totalSegmentsBeforeRebalance;
+ _segmentsAdded = segmentsAdded;
+ _segmentsDeleted = segmentsDeleted;
+ _segmentsUnchanged = segmentsUnchanged;
+ _tagList = tagList;
+ }
+
+ @JsonProperty
+ public ServerStatus getServerStatus() {
+ return _serverStatus;
+ }
+
+ @JsonProperty
+ public int getTotalSegmentsAfterRebalance() {
+ return _totalSegmentsAfterRebalance;
+ }
+
+ @JsonProperty
+ public int getTotalSegmentsBeforeRebalance() {
+ return _totalSegmentsBeforeRebalance;
+ }
+
+ @JsonProperty
+ public int getSegmentsAdded() {
+ return _segmentsAdded;
+ }
+
+ @JsonProperty
+ public int getSegmentsDeleted() {
+ return _segmentsDeleted;
+ }
+
+ @JsonProperty
+ public int getSegmentsUnchanged() {
+ return _segmentsUnchanged;
+ }
+
+ @JsonProperty
+ public List<String> getTagList() {
+ return _tagList;
+ }
+ }
+
+ public static class RebalanceChangeInfo {
+ private final int _valueBeforeRebalance;
+ private final int _expectedValueAfterRebalance;
+
+ /**
+ * Constructor for RebalanceChangeInfo
+ * @param valueBeforeRebalance current value before rebalance
+ * @param expectedValueAfterRebalance expected value after rebalance
+ */
+ @JsonCreator
+ public RebalanceChangeInfo(@JsonProperty("valueBeforeRebalance") int
valueBeforeRebalance,
+ @JsonProperty("expectedValueAfterRebalance") int
expectedValueAfterRebalance) {
+ _valueBeforeRebalance = valueBeforeRebalance;
+ _expectedValueAfterRebalance = expectedValueAfterRebalance;
+ }
+
+ @JsonProperty
+ public int getValueBeforeRebalance() {
+ return _valueBeforeRebalance;
+ }
+
+ @JsonProperty
+ public int getExpectedValueAfterRebalance() {
+ return _expectedValueAfterRebalance;
+ }
+ }
+
+ public static class ServerInfo {
+ private final int _numServersGettingNewSegments;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final RebalanceChangeInfo _numServers;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final Set<String> _serversAdded;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final Set<String> _serversRemoved;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final Set<String> _serversUnchanged;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final Set<String> _serversGettingNewSegments;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final Map<String, ServerSegmentChangeInfo>
_serverSegmentChangeInfo;
+
+ /**
+ * Constructor for ServerInfo
+ * @param numServersGettingNewSegments total number of servers receiving
new segments as part of this rebalance
+ * @param numServers number of servers before and after this rebalance
+ * @param serversAdded set of servers getting added as part of this
rebalance
+ * @param serversRemoved set of servers getting removed as part of this
rebalance
+ * @param serversUnchanged set of servers existing both before and as part
of this rebalance
+ * @param serversGettingNewSegments set of servers getting segments added
+ * @param serverSegmentChangeInfo per server statistics for this rebalance
+ */
+ @JsonCreator
+ public ServerInfo(@JsonProperty("numServersGettingNewSegments") int
numServersGettingNewSegments,
+ @JsonProperty("numServers") @Nullable RebalanceChangeInfo numServers,
+ @JsonProperty("serversAdded") @Nullable Set<String> serversAdded,
+ @JsonProperty("serversRemoved") @Nullable Set<String> serversRemoved,
+ @JsonProperty("serversUnchanged") @Nullable Set<String>
serversUnchanged,
+ @JsonProperty("serversGettingNewSegments") @Nullable Set<String>
serversGettingNewSegments,
+ @JsonProperty("serverSegmentChangeInfo")
+ @Nullable Map<String, ServerSegmentChangeInfo>
serverSegmentChangeInfo) {
+ _numServersGettingNewSegments = numServersGettingNewSegments;
+ _numServers = numServers;
+ _serversAdded = serversAdded;
+ _serversRemoved = serversRemoved;
+ _serversUnchanged = serversUnchanged;
+ _serversGettingNewSegments = serversGettingNewSegments;
+ _serverSegmentChangeInfo = serverSegmentChangeInfo;
+ }
+
+ @JsonProperty
+ public int getNumServersGettingNewSegments() {
+ return _numServersGettingNewSegments;
+ }
+
+ @JsonProperty
+ public RebalanceChangeInfo getNumServers() {
+ return _numServers;
+ }
+
+ @JsonProperty
+ public Set<String> getServersAdded() {
+ return _serversAdded;
+ }
+
+ @JsonProperty
+ public Set<String> getServersRemoved() {
+ return _serversRemoved;
+ }
+
+ @JsonProperty
+ public Set<String> getServersUnchanged() {
+ return _serversUnchanged;
+ }
+
+ @JsonProperty
+ public Set<String> getServersGettingNewSegments() {
+ return _serversGettingNewSegments;
+ }
+
+ @JsonProperty
+ public Map<String, ServerSegmentChangeInfo> getServerSegmentChangeInfo() {
+ return _serverSegmentChangeInfo;
+ }
+ }
+
+ public static class SegmentInfo {
+ // TODO: Add a metric to estimate the total time it will take to rebalance
+ private final int _totalSegmentsToBeMoved;
+ private final int _maxSegmentsAddedToASingleServer;
+ private final long _estimatedAverageSegmentSizeInBytes;
+ private final long _totalEstimatedDataToBeMovedInBytes;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final RebalanceChangeInfo _replicationFactor;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final RebalanceChangeInfo _numSegmentsInSingleReplica;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas;
+
+ /**
+ * Constructor for SegmentInfo
+ * @param totalSegmentsToBeMoved total number of segments to be moved as
part of this rebalance
+ * @param maxSegmentsAddedToASingleServer maximum segments added to a
single server as part of this rebalance
+ * @param estimatedAverageSegmentSizeInBytes estimated average size of
segments in bytes
+ * @param totalEstimatedDataToBeMovedInBytes total estimated amount of
data to be moved as part of this rebalance
+ * @param replicationFactor replication factor before and after this
rebalance
+ * @param numSegmentsInSingleReplica number of segments in single replica
before and after this rebalance
+ * @param numSegmentsAcrossAllReplicas total number of segments across all
replicas before and after this rebalance
+ */
+ @JsonCreator
+ public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int
totalSegmentsToBeMoved,
+ @JsonProperty("maxSegmentsAddedToASingleServer") int
maxSegmentsAddedToASingleServer,
+ @JsonProperty("estimatedAverageSegmentSizeInBytes") long
estimatedAverageSegmentSizeInBytes,
+ @JsonProperty("totalEstimatedDataToBeMovedInBytes") long
totalEstimatedDataToBeMovedInBytes,
+ @JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo
replicationFactor,
+ @JsonProperty("numSegmentsInSingleReplica") @Nullable
RebalanceChangeInfo numSegmentsInSingleReplica,
+ @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable
RebalanceChangeInfo numSegmentsAcrossAllReplicas) {
+ _totalSegmentsToBeMoved = totalSegmentsToBeMoved;
+ _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
+ _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
+ _totalEstimatedDataToBeMovedInBytes = totalEstimatedDataToBeMovedInBytes;
+ _replicationFactor = replicationFactor;
+ _numSegmentsInSingleReplica = numSegmentsInSingleReplica;
+ _numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas;
+ }
+
+ @JsonProperty
+ public int getTotalSegmentsToBeMoved() {
+ return _totalSegmentsToBeMoved;
+ }
+
+ @JsonProperty
+ public int getMaxSegmentsAddedToASingleServer() {
+ return _maxSegmentsAddedToASingleServer;
+ }
+
+ @JsonProperty
+ public long getEstimatedAverageSegmentSizeInBytes() {
+ return _estimatedAverageSegmentSizeInBytes;
+ }
+
+ @JsonProperty
+ public long getTotalEstimatedDataToBeMovedInBytes() {
+ return _totalEstimatedDataToBeMovedInBytes;
+ }
+
+ @JsonProperty
+ public RebalanceChangeInfo getReplicationFactor() {
+ return _replicationFactor;
+ }
+
+ @JsonProperty
+ public RebalanceChangeInfo getNumSegmentsInSingleReplica() {
+ return _numSegmentsInSingleReplica;
+ }
+
+ @JsonProperty
+ public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
+ return _numSegmentsAcrossAllReplicas;
+ }
+ }
+
+ public enum ServerStatus {
+ // ADDED if the server is newly added as part of rebalance;
+ // REMOVED if the server is removed as part of rebalance;
+ // UNCHANGED if the server status is unchanged as part of rebalance;
+ ADDED, REMOVED, UNCHANGED
+ }
+}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index 275994e57a..b5d56f2ba6 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -43,11 +43,13 @@ import org.apache.helix.HelixManager;
import org.apache.helix.PropertyKey;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
+import org.apache.pinot.common.exception.InvalidConfigException;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.tier.PinotServerTierStorage;
@@ -60,6 +62,7 @@ import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignme
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentFactory;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
import
org.apache.pinot.controller.helix.core.assignment.segment.StrictRealtimeSegmentAssignment;
+import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
@@ -120,9 +123,11 @@ public class TableRebalancer {
private final TableRebalanceObserver _tableRebalanceObserver;
private final ControllerMetrics _controllerMetrics;
private final RebalancePreChecker _rebalancePreChecker;
+ private final TableSizeReader _tableSizeReader;
public TableRebalancer(HelixManager helixManager, @Nullable
TableRebalanceObserver tableRebalanceObserver,
- @Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker) {
+ @Nullable ControllerMetrics controllerMetrics, @Nullable
RebalancePreChecker rebalancePreChecker,
+ @Nullable TableSizeReader tableSizeReader) {
_helixManager = helixManager;
if (tableRebalanceObserver != null) {
_tableRebalanceObserver = tableRebalanceObserver;
@@ -132,10 +137,11 @@ public class TableRebalancer {
_helixDataAccessor = helixManager.getHelixDataAccessor();
_controllerMetrics = controllerMetrics;
_rebalancePreChecker = rebalancePreChecker;
+ _tableSizeReader = tableSizeReader;
}
public TableRebalancer(HelixManager helixManager) {
- this(helixManager, null, null, null);
+ this(helixManager, null, null, null, null);
}
public static String createUniqueRebalanceJobIdentifier() {
@@ -175,6 +181,7 @@ public class TableRebalancer {
rebalanceJobId = createUniqueRebalanceJobIdentifier();
}
boolean dryRun = rebalanceConfig.isDryRun();
+ boolean summary = rebalanceConfig.isSummary();
boolean preChecks = rebalanceConfig.isPreChecks();
boolean reassignInstances = rebalanceConfig.isReassignInstances();
boolean includeConsuming = rebalanceConfig.isIncludeConsuming();
@@ -189,14 +196,19 @@ public class TableRebalancer {
&&
RoutingConfig.STRICT_REPLICA_GROUP_INSTANCE_SELECTOR_TYPE.equalsIgnoreCase(
tableConfig.getRoutingConfig().getInstanceSelectorType());
LOGGER.info(
- "Start rebalancing table: {} with dryRun: {}, preChecks: {},
reassignInstances: {}, includeConsuming: {}, "
- + "bootstrap: {}, downtime: {}, minReplicasToKeepUpForNoDowntime:
{}, enableStrictReplicaGroup: {}, "
- + "lowDiskMode: {}, bestEfforts: {},
externalViewCheckIntervalInMs: {}, "
+ "Start rebalancing table: {} with dryRun: {}, summary: {}, preChecks:
{}, reassignInstances: {}, "
+ + "includeConsuming: {}, bootstrap: {}, downtime: {},
minReplicasToKeepUpForNoDowntime: {}, "
+ + "enableStrictReplicaGroup: {}, lowDiskMode: {}, bestEfforts: {},
externalViewCheckIntervalInMs: {}, "
+ "externalViewStabilizationTimeoutInMs: {}",
- tableNameWithType, dryRun, preChecks, reassignInstances,
includeConsuming, bootstrap, downtime,
+ tableNameWithType, dryRun, summary, preChecks, reassignInstances,
includeConsuming, bootstrap, downtime,
minReplicasToKeepUpForNoDowntime, enableStrictReplicaGroup,
lowDiskMode, bestEfforts,
externalViewCheckIntervalInMs, externalViewStabilizationTimeoutInMs);
+ if (summary && !dryRun) {
+ return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
+ "Must enable dry-run mode to use summary mode", null, null, null,
null, null);
+ }
+
// Perform pre-checks if enabled
Map<String, String> preChecksResult = null;
if (preChecks) {
@@ -205,7 +217,8 @@ public class TableRebalancer {
String errorMsg = String.format("Pre-checks can only be enabled in
dry-run mode, not triggering rebalance for "
+ "table: %s with rebalanceJobId: %s", tableNameWithType,
rebalanceJobId);
LOGGER.error(errorMsg);
- return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null);
+ return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, errorMsg, null, null, null, null,
+ null);
}
if (_rebalancePreChecker != null) {
preChecksResult = _rebalancePreChecker.check(rebalanceJobId,
tableNameWithType, tableConfig);
@@ -222,21 +235,21 @@ public class TableRebalancer {
"For rebalanceId: %s, caught exception while fetching IdealState for
table: %s, aborting the rebalance",
rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Caught exception while fetching IdealState: " + e, null, null,
null, preChecksResult);
+ "Caught exception while fetching IdealState: " + e, null, null,
null, preChecksResult, null);
}
if (currentIdealState == null) {
onReturnFailure(
String.format("For rebalanceId: %s, cannot find the IdealState for
table: %s, aborting the rebalance",
rebalanceJobId, tableNameWithType), null);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED, "Cannot find the IdealState for table",
- null, null, null, preChecksResult);
+ null, null, null, preChecksResult, null);
}
if (!currentIdealState.isEnabled() && !downtime) {
onReturnFailure(String.format(
"For rebalanceId: %s, cannot rebalance disabled table: %s without
downtime, aborting the rebalance",
rebalanceJobId, tableNameWithType), null);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Cannot rebalance disabled table without downtime", null, null,
null, preChecksResult);
+ "Cannot rebalance disabled table without downtime", null, null,
null, preChecksResult, null);
}
LOGGER.info("For rebalanceId: {}, processing instance partitions for
table: {}", rebalanceJobId, tableNameWithType);
@@ -254,7 +267,8 @@ public class TableRebalancer {
"For rebalanceId: %s, caught exception while fetching/calculating
instance partitions for table: %s, "
+ "aborting the rebalance", rebalanceJobId, tableNameWithType),
e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
- "Caught exception while fetching/calculating instance partitions: "
+ e, null, null, null, preChecksResult);
+ "Caught exception while fetching/calculating instance partitions: "
+ e, null, null, null, preChecksResult,
+ null);
}
// Calculate instance partitions for tiers if configured
@@ -273,7 +287,7 @@ public class TableRebalancer {
+ "aborting the rebalance", rebalanceJobId, tableNameWithType),
e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while fetching/calculating tier instance
partitions: " + e, null, null, null,
- preChecksResult);
+ preChecksResult, null);
}
LOGGER.info("For rebalanceId: {}, calculating the target assignment for
table: {}", rebalanceJobId,
@@ -291,7 +305,7 @@ public class TableRebalancer {
+ "rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.FAILED,
"Caught exception while calculating target assignment: " + e,
instancePartitionsMap,
- tierToInstancePartitionsMap, null, preChecksResult);
+ tierToInstancePartitionsMap, null, preChecksResult, null);
}
boolean segmentAssignmentUnchanged =
currentAssignment.equals(targetAssignment);
@@ -299,6 +313,13 @@ public class TableRebalancer {
+ "segmentAssignmentUnchanged: {} for table: {}", rebalanceJobId,
instancePartitionsUnchanged,
tierInstancePartitionsUnchanged, segmentAssignmentUnchanged,
tableNameWithType);
+ RebalanceSummaryResult summaryResult = null;
+ if (summary) {
+ // Calculate summary here itself so that even if the table is already
balanced, the caller can verify whether that
+ // is expected or not based on the summary results
+ summaryResult = calculateDryRunSummary(currentAssignment,
targetAssignment, tableNameWithType, rebalanceJobId);
+ }
+
if (segmentAssignmentUnchanged) {
LOGGER.info("Table: {} is already balanced", tableNameWithType);
if (instancePartitionsUnchanged && tierInstancePartitionsUnchanged) {
@@ -306,28 +327,38 @@ public class TableRebalancer {
String.format("For rebalanceId: %s, instance unchanged and table:
%s is already balanced", rebalanceJobId,
tableNameWithType));
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.NO_OP, "Table is already balanced",
- instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult);
+ summary ? null : instancePartitionsMap, summary ? null :
tierToInstancePartitionsMap,
+ summary ? null : targetAssignment, preChecksResult, summaryResult);
} else {
if (dryRun) {
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
- "Instance reassigned in dry-run mode, table is already
balanced", instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+ "Instance reassigned in dry-run mode, table is already balanced",
+ summary ? null : instancePartitionsMap, summary ? null :
tierToInstancePartitionsMap,
+ summary ? null : targetAssignment, preChecksResult,
summaryResult);
} else {
_tableRebalanceObserver.onSuccess(
String.format("For rebalanceId: %s, instance reassigned but
table: %s is already balanced",
rebalanceJobId, tableNameWithType));
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.DONE,
- "Instance reassigned, table is already balanced",
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment, preChecksResult);
+ "Instance reassigned, table is already balanced", summary ? null
: instancePartitionsMap,
+ summary ? null : tierToInstancePartitionsMap, summary ? null :
targetAssignment, preChecksResult,
+ summaryResult);
}
}
}
+ if (summary) {
+ LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run
summary mode, returning the summary only",
+ rebalanceJobId, tableNameWithType);
+ return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Dry-run summary mode", null,
+ null, null, preChecksResult, summaryResult);
+ }
+
if (dryRun) {
LOGGER.info("For rebalanceId: {}, rebalancing table: {} in dry-run mode,
returning the target assignment",
rebalanceJobId, tableNameWithType);
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Dry-run mode", instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
if (downtime) {
@@ -352,14 +383,14 @@ public class TableRebalancer {
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with downtime (replaced IdealState with the target
segment assignment, ExternalView might not "
+ "reach the target segment assignment yet)",
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment, preChecksResult);
+ targetAssignment, preChecksResult, summaryResult);
} catch (Exception e) {
onReturnFailure(String.format(
"For rebalanceId: %s, caught exception while updating IdealState
for table: %s, aborting the rebalance",
rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment, preChecksResult);
+ targetAssignment, preChecksResult, summaryResult);
}
}
@@ -391,7 +422,7 @@ public class TableRebalancer {
minReplicasToKeepUpForNoDowntime, tableNameWithType, numReplicas),
null);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Illegal min available replicas config", instancePartitionsMap,
tierToInstancePartitionsMap,
- targetAssignment, preChecksResult);
+ targetAssignment, preChecksResult, summaryResult);
}
minAvailableReplicas = minReplicasToKeepUpForNoDowntime;
} else {
@@ -432,12 +463,12 @@ public class TableRebalancer {
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId,
_tableRebalanceObserver.getStopStatus(),
"Caught exception while waiting for ExternalView to converge: "
+ e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
_tableRebalanceObserver.onError(errorMsg);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while waiting for ExternalView to converge: " +
e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
// Re-calculate the target assignment if IdealState changed while
waiting for ExternalView to converge
@@ -486,7 +517,7 @@ public class TableRebalancer {
+ "aborting the rebalance", rebalanceJobId,
tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while re-calculating the target assignment:
" + e, instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment,
preChecksResult);
+ tierToInstancePartitionsMap, targetAssignment,
preChecksResult, summaryResult);
}
} else {
LOGGER.info("For rebalanceId:{}, no state change found for segments
to be moved, "
@@ -510,7 +541,7 @@ public class TableRebalancer {
return new RebalanceResult(rebalanceJobId, RebalanceResult.Status.DONE,
"Success with minAvailableReplicas: " + minAvailableReplicas
+ " (both IdealState and ExternalView should reach the target
segment assignment)",
- instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult);
+ instancePartitionsMap, tierToInstancePartitionsMap,
targetAssignment, preChecksResult, summaryResult);
}
// Record change of current ideal state and the new target
@@ -519,7 +550,7 @@ public class TableRebalancer {
if (_tableRebalanceObserver.isStopped()) {
return new RebalanceResult(rebalanceJobId,
_tableRebalanceObserver.getStopStatus(),
"Rebalance has stopped already before updating the IdealState",
instancePartitionsMap,
- tierToInstancePartitionsMap, targetAssignment, preChecksResult);
+ tierToInstancePartitionsMap, targetAssignment, preChecksResult,
summaryResult);
}
Map<String, Map<String, String>> nextAssignment =
getNextAssignment(currentAssignment, targetAssignment,
minAvailableReplicas, enableStrictReplicaGroup,
@@ -550,7 +581,7 @@ public class TableRebalancer {
+ "aborting the rebalance", rebalanceJobId, tableNameWithType), e);
return new RebalanceResult(rebalanceJobId,
RebalanceResult.Status.FAILED,
"Caught exception while updating IdealState: " + e,
instancePartitionsMap, tierToInstancePartitionsMap,
- targetAssignment, preChecksResult);
+ targetAssignment, preChecksResult, summaryResult);
}
segmentsToMonitor = new HashSet<>(segmentsToMove);
@@ -559,6 +590,147 @@ public class TableRebalancer {
}
}
+ private long calculateTableSizePerReplicaInBytes(String tableNameWithType) {
+ long tableSizePerReplicaInBytes = -1;
+ if (_tableSizeReader == null) {
+ LOGGER.warn("tableSizeReader is null, cannot calculate table size for
table {}!", tableNameWithType);
+ return tableSizePerReplicaInBytes;
+ }
+ LOGGER.info("Fetching the table size for rebalance summary for table: {}",
tableNameWithType);
+ try {
+ // TODO: Consider making the timeoutMs for fetching table size via table
rebalancer configurable
+ TableSizeReader.TableSubTypeSizeDetails tableSizeDetails =
+ _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+ tableSizePerReplicaInBytes =
tableSizeDetails._reportedSizePerReplicaInBytes;
+ } catch (InvalidConfigException e) {
+ LOGGER.error("Caught exception while trying to fetch table size details
for table: {}", tableNameWithType, e);
+ }
+ LOGGER.info("Fetched the table size (per replica size: {}) for rebalance
summary for table: {}",
+ tableSizePerReplicaInBytes, tableNameWithType);
+ return tableSizePerReplicaInBytes;
+ }
+
+ private RebalanceSummaryResult calculateDryRunSummary(Map<String,
Map<String, String>> currentAssignment,
+ Map<String, Map<String, String>> targetAssignment, String
tableNameWithType, String rebalanceJobId) {
+ LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
+ tableNameWithType, rebalanceJobId);
+ int existingReplicationFactor = 0;
+ int newReplicationFactor = 0;
+ Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
+ existingReplicationFactor = entrySet.getValue().size();
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+
+ for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
+ newReplicationFactor = entrySet.getValue().size();
+ for (String segmentKey : entrySet.getValue().keySet()) {
+ newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ }
+ }
+ RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(existingReplicationFactor,
newReplicationFactor);
+
+ int existingNumServers = existingServersToSegmentMap.size();
+ int newNumServers = newServersToSegmentMap.size();
+ RebalanceSummaryResult.RebalanceChangeInfo numServers
+ = new RebalanceSummaryResult.RebalanceChangeInfo(existingNumServers,
newNumServers);
+
+ List<InstanceConfig> instanceConfigs = _helixDataAccessor.getChildValues(
+ _helixDataAccessor.keyBuilder().instanceConfigs(), true);
+ Map<String, List<String>> instanceToTagsMap = new HashMap<>();
+ for (InstanceConfig instanceConfig : instanceConfigs) {
+ instanceToTagsMap.put(instanceConfig.getInstanceName(),
instanceConfig.getTags());
+ }
+
+ Set<String> serversAdded = new HashSet<>();
+ Set<String> serversRemoved = new HashSet<>();
+ Set<String> serversUnchanged = new HashSet<>();
+ Set<String> serversGettingNewSegments = new HashSet<>();
+ Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo>
serverSegmentChangeInfoMap = new HashMap<>();
+ int segmentsNotMoved = 0;
+ int maxSegmentsAddedToServer = 0;
+ for (Map.Entry<String, Set<String>> entry :
newServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ Set<String> segmentSet = entry.getValue();
+ int totalNewSegments = segmentSet.size();
+
+ Set<String> newSegmentSet = new HashSet<>(segmentSet);
+ Set<String> existingSegmentSet = new HashSet<>();
+ int segmentsUnchanged = 0;
+ int totalExistingSegments = 0;
+ RebalanceSummaryResult.ServerStatus serverStatus =
RebalanceSummaryResult.ServerStatus.ADDED;
+ if (existingServersToSegmentMap.containsKey(server)) {
+ Set<String> segmentSetForServer =
existingServersToSegmentMap.get(server);
+ totalExistingSegments = segmentSetForServer.size();
+ existingSegmentSet.addAll(segmentSetForServer);
+ Set<String> intersection = new HashSet<>(segmentSetForServer);
+ intersection.retainAll(newSegmentSet);
+ segmentsUnchanged = intersection.size();
+ segmentsNotMoved += segmentsUnchanged;
+ serverStatus = RebalanceSummaryResult.ServerStatus.UNCHANGED;
+ serversUnchanged.add(server);
+ } else {
+ serversAdded.add(server);
+ }
+ newSegmentSet.removeAll(existingSegmentSet);
+ int segmentsAdded = newSegmentSet.size();
+ if (segmentsAdded > 0) {
+ serversGettingNewSegments.add(server);
+ }
+ maxSegmentsAddedToServer = Math.max(maxSegmentsAddedToServer,
segmentsAdded);
+ int segmentsDeleted = existingSegmentSet.size() - segmentsUnchanged;
+
+ serverSegmentChangeInfoMap.put(server, new
RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
+ totalNewSegments, totalExistingSegments, segmentsAdded,
segmentsDeleted, segmentsUnchanged,
+ instanceToTagsMap.getOrDefault(server, null)));
+ }
+
+ for (Map.Entry<String, Set<String>> entry :
existingServersToSegmentMap.entrySet()) {
+ String server = entry.getKey();
+ if (!serverSegmentChangeInfoMap.containsKey(server)) {
+ serversRemoved.add(server);
+ serverSegmentChangeInfoMap.put(server, new
RebalanceSummaryResult.ServerSegmentChangeInfo(
+ RebalanceSummaryResult.ServerStatus.REMOVED, 0,
entry.getValue().size(), 0, entry.getValue().size(), 0,
+ instanceToTagsMap.getOrDefault(server, null)));
+ }
+ }
+
+ RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(),
targetAssignment.size());
+
+ int existingNumberSegmentsTotal = existingReplicationFactor *
currentAssignment.size();
+ int newNumberSegmentsTotal = newReplicationFactor *
targetAssignment.size();
+ RebalanceSummaryResult.RebalanceChangeInfo numSegmentsAcrossAllReplicas
+ = new
RebalanceSummaryResult.RebalanceChangeInfo(existingNumberSegmentsTotal,
newNumberSegmentsTotal);
+
+ int totalSegmentsToBeMoved = newNumberSegmentsTotal - segmentsNotMoved;
+
+ long tableSizePerReplicaInBytes =
calculateTableSizePerReplicaInBytes(tableNameWithType);
+ long averageSegmentSizeInBytes = tableSizePerReplicaInBytes <= 0 ?
tableSizePerReplicaInBytes
+ : tableSizePerReplicaInBytes / ((long) currentAssignment.size());
+ long totalEstimatedDataToBeMovedInBytes = tableSizePerReplicaInBytes <= 0
? tableSizePerReplicaInBytes
+ : ((long) totalSegmentsToBeMoved) * averageSegmentSizeInBytes;
+
+ // Set some of the sets to null if they are empty to ensure they don't
show up in the result
+ RebalanceSummaryResult.ServerInfo serverInfo = new
RebalanceSummaryResult.ServerInfo(
+ serversGettingNewSegments.size(), numServers, serversAdded,
serversRemoved, serversUnchanged,
+ serversGettingNewSegments, serverSegmentChangeInfoMap);
+ // TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
+ // rebalance time can vary with number of segments added
+ RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
+ maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+
+ LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
+ rebalanceJobId);
+ return new RebalanceSummaryResult(serverInfo, segmentInfo);
+ }
+
private void onReturnFailure(String errorMsg, Exception e) {
if (e != null) {
LOGGER.warn(errorMsg, e);
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
index e115476529..661fff70d9 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/DefaultTenantRebalancer.java
@@ -60,7 +60,7 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
false));
} catch (TableNotFoundException exception) {
rebalanceResult.put(table, new RebalanceResult(null,
RebalanceResult.Status.FAILED, exception.getMessage(),
- null, null, null, null));
+ null, null, null, null, null));
}
});
if (config.isDryRun()) {
@@ -71,7 +71,8 @@ public class DefaultTenantRebalancer implements
TenantRebalancer {
if (result.getStatus() == RebalanceResult.Status.DONE) {
rebalanceResult.put(table, new RebalanceResult(result.getJobId(),
RebalanceResult.Status.IN_PROGRESS,
"In progress, check controller task status for the",
result.getInstanceAssignment(),
- result.getTierInstanceAssignment(),
result.getSegmentAssignment(), result.getPreChecksResult()));
+ result.getTierInstanceAssignment(),
result.getSegmentAssignment(), result.getPreChecksResult(),
+ result.getRebalanceSummaryResult()));
}
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
index c96c25b250..ed0caf0f29 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/tenant/TenantRebalanceResult.java
@@ -36,7 +36,7 @@ public class TenantRebalanceResult {
_rebalanceTableResults = new HashMap<>();
rebalanceTableResults.forEach((table, result) -> {
_rebalanceTableResults.put(table, new
RebalanceResult(result.getJobId(), result.getStatus(),
- result.getDescription(), null, null, null, null));
+ result.getDescription(), null, null, null, null, null));
});
}
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index de83ec6eee..2f7ab350bc 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,6 +24,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.assignment.InstancePartitions;
@@ -91,15 +92,26 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
i, true);
}
+ ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
- preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker);
+ preChecker.init(_helixResourceManager, executorService);
+ TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
+ _helixResourceManager.getTableSizeReader());
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
// Rebalance should fail without creating the table
RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+ // Rebalance with dry-run summary should fail without creating the table
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
// Create the table
addDummySchema(RAW_TABLE_NAME);
@@ -114,6 +126,27 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
Map<String, Map<String, String>> oldSegmentAssignment =
_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields();
+ // Rebalance with dry-run summary should return NO_OP status
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+
+ // Dry-run mode should not change the IdealState
+
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+ oldSegmentAssignment);
+
// Rebalance should return NO_OP status
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
@@ -137,8 +170,54 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
addFakeServerInstanceToAutoJoinHelixCluster(SERVER_INSTANCE_ID_PREFIX +
(numServers + i), true);
}
+ // Rebalance in dry-run summary mode with added servers
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
14);
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
3);
+
+ Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo>
serverSegmentChangeInfoMap =
+ rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
+ assertNotNull(serverSegmentChangeInfoMap);
+ for (int i = 0; i < numServers; i++) {
+ // Original servers should be losing some segments
+ String newServer = SERVER_INSTANCE_ID_PREFIX + i;
+ RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange =
serverSegmentChangeInfoMap.get(newServer);
+ assertTrue(serverSegmentChange.getSegmentsDeleted() > 0);
+ assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
+ assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
+ assertTrue(serverSegmentChange.getTotalSegmentsAfterRebalance() > 0);
+ assertEquals(serverSegmentChange.getSegmentsAdded(), 0);
+ }
+ for (int i = 0; i < numServersToAdd; i++) {
+ // New servers should only get new segments
+ String newServer = SERVER_INSTANCE_ID_PREFIX + (numServers + i);
+ RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange =
serverSegmentChangeInfoMap.get(newServer);
+ assertTrue(serverSegmentChange.getSegmentsAdded() > 0);
+ assertEquals(serverSegmentChange.getTotalSegmentsBeforeRebalance(), 0);
+ assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(),
serverSegmentChange.getSegmentsAdded());
+ assertEquals(serverSegmentChange.getSegmentsDeleted(), 0);
+ assertEquals(serverSegmentChange.getSegmentsUnchanged(), 0);
+ }
+
+ // Dry-run mode should not change the IdealState
+
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+ oldSegmentAssignment);
+
// Rebalance in dry-run mode
- RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDryRun(true);
rebalanceConfig.setPreChecks(true);
rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
@@ -186,6 +265,26 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
oldSegmentAssignment);
+ // Rebalance dry-run summary with 3 min available replicas should not be
impacted since actual rebalance does not
+ // occur
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setPreChecks(true);
+ rebalanceConfig.setMinAvailableReplicas(3);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+ assertNotNull(rebalanceResult.getPreChecksResult());
+
// Rebalance with 3 min available replicas should fail as the table only
have 3 replicas
rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setMinAvailableReplicas(3);
@@ -218,6 +317,41 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig, null, false)));
_helixResourceManager.updateTableConfig(tableConfig);
+ // Try dry-run summary mode
+ // No need to reassign instances because instances should be automatically
assigned when updating the table config
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
11);
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+
+ serverSegmentChangeInfoMap =
rebalanceSummaryResult.getServerInfo().getServerSegmentChangeInfo();
+ assertNotNull(serverSegmentChangeInfoMap);
+ for (int i = 0; i < numServers + numServersToAdd; i++) {
+ String newServer = SERVER_INSTANCE_ID_PREFIX + i;
+ RebalanceSummaryResult.ServerSegmentChangeInfo serverSegmentChange =
serverSegmentChangeInfoMap.get(newServer);
+ assertEquals(serverSegmentChange.getTotalSegmentsAfterRebalance(), 5);
+ // Ensure not all segments moved
+ assertTrue(serverSegmentChange.getSegmentsUnchanged() > 0);
+ // Ensure all segments has something assigned prior to rebalance
+ assertTrue(serverSegmentChange.getTotalSegmentsBeforeRebalance() > 0);
+ }
+
+ // Dry-run mode should not change the IdealState
+
assertEquals(_helixResourceManager.getTableIdealState(OFFLINE_TABLE_NAME).getRecord().getMapFields(),
+ newSegmentAssignment);
+
+ // Try actual rebalance
// No need to reassign instances because instances should be automatically
assigned when updating the table config
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -264,6 +398,25 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
tableConfig.setInstanceAssignmentConfigMap(null);
_helixResourceManager.updateTableConfig(tableConfig);
+ // Try dry-run summary mode without reassignment to ensure that existing
instance partitions are used
+ // no movement should occur
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+
// Without instances reassignment, the rebalance should return status
NO_OP, and the existing instance partitions
// should be used
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
@@ -271,6 +424,26 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceResult.getInstanceAssignment(), instanceAssignment);
assertEquals(rebalanceResult.getSegmentAssignment(), newSegmentAssignment);
+ // Try dry-run summary mode with reassignment
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+ // No move expected since already balanced
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+
// With instances reassignment, the rebalance should return status DONE,
the existing instance partitions should be
// removed, and the default instance partitions should be used
rebalanceConfig = new RebalanceConfig();
@@ -300,6 +473,25 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
TagNameUtils.getOfflineTagForTenant(null));
}
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceConfig.setDowntime(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
15);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
3);
+
// Rebalance with downtime should succeed
rebalanceConfig = new RebalanceConfig();
rebalanceConfig.setDowntime(true);
@@ -327,7 +519,18 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
}
+ // Try summary mode without dry-run set
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
_helixResourceManager.deleteOfflineTable(RAW_TABLE_NAME);
+ executorService.shutdown();
}
/**
@@ -367,7 +570,27 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
- RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new RebalanceConfig(), null);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+
+ // Run actual table rebalance
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// Segment assignment should not change
assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -382,6 +605,24 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
_helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER,
TIER_B_NAME, 3, 3, 0));
+ // Try dry-run summary mode, should be no-op
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+
// rebalance is NOOP and no change in assignment caused by new instances
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
@@ -400,6 +641,24 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
TierFactory.PINOT_SERVER_STORAGE_TYPE, NO_TIER_NAME + "_OFFLINE",
null, null)));
_helixResourceManager.updateTableConfig(tableConfig);
+ // Try dry-run summary mode, some moves should occur
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
15);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
9);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
6);
+
// rebalance should change assignment
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -458,7 +717,26 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
_helixResourceManager.getTableIdealState(OFFLINE_TIERED_TABLE_NAME).getRecord().getMapFields();
TableRebalancer tableRebalancer = new TableRebalancer(_helixManager);
- RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
new RebalanceConfig(), null);
+
+ // Try dry-run summary mode
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ RebalanceSummaryResult rebalanceSummaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
// Segment assignment should not change
assertEquals(rebalanceResult.getSegmentAssignment(), oldSegmentAssignment);
@@ -469,6 +747,25 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
"replicaAssignment" + TIER_A_NAME + "_" + SERVER_INSTANCE_ID_PREFIX
+ i, false);
}
_helixResourceManager.createServerTenant(new Tenant(TenantRole.SERVER,
"replicaAssignment" + TIER_A_NAME, 6, 6, 0));
+
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceResult.getRebalanceSummaryResult());
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+
// rebalance is NOOP and no change in assignment caused by new instances
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
@@ -481,6 +778,24 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
TierFactory.PINOT_SERVER_STORAGE_TYPE, "replicaAssignment" +
TIER_A_NAME + "_OFFLINE", null, null)));
_helixResourceManager.updateTableConfig(tableConfig);
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceResult.getRebalanceSummaryResult());
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
30);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
6);
+
// rebalance should change assignment
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
@@ -504,6 +819,23 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
new InstanceAssignmentConfig(tagPoolConfig, null,
replicaGroupPartitionConfig, null, false)));
_helixResourceManager.updateTableConfig(tableConfig);
+ // Try dry-run summary mode
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+ assertNull(rebalanceResult.getInstanceAssignment());
+ assertNull(rebalanceResult.getTierInstanceAssignment());
+ assertNull(rebalanceResult.getSegmentAssignment());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
13);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+
rebalanceResult = tableRebalancer.rebalance(tableConfig, new
RebalanceConfig(), null);
assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
assertTrue(rebalanceResult.getTierInstanceAssignment().containsKey(TIER_A_NAME));
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 6750a7340d..7b8e4168a7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -24,6 +24,8 @@ import com.google.common.collect.ImmutableList;
import java.io.File;
import java.io.IOException;
import java.net.URI;
+import java.net.URISyntaxException;
+import java.net.URL;
import java.sql.ResultSet;
import java.sql.Statement;
import java.sql.Timestamp;
@@ -40,6 +42,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@@ -64,10 +67,12 @@ import
org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
import org.apache.pinot.common.utils.http.HttpClient;
+import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
import
org.apache.pinot.controller.helix.core.rebalance.DefaultRebalancePreChecker;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceConfig;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
@@ -176,6 +181,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
private PinotHelixResourceManager _resourceManager;
private TableRebalancer _tableRebalancer;
+ private ExecutorService _executorService;
protected int getNumBrokers() {
return NUM_BROKERS;
@@ -287,8 +293,10 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
_resourceManager = _controllerStarter.getHelixResourceManager();
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
- preChecker.init(_helixResourceManager, Executors.newFixedThreadPool(10));
- _tableRebalancer = new
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker);
+ _executorService = Executors.newFixedThreadPool(10);
+ preChecker.init(_helixResourceManager, _executorService);
+ _tableRebalancer = new
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker,
+ _resourceManager.getTableSizeReader());
}
private void reloadAllSegments(String testQuery, boolean forceDownload, long
numTotalDocs)
@@ -3062,6 +3070,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
stopController();
stopZk();
FileUtils.deleteDirectory(_tempDir);
+ _executorService.shutdown();
}
private void testInstanceDecommission()
@@ -4018,4 +4027,199 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(result.get("clientRequestId").asText(), clientRequestId);
}
+
+ @Test
+ public void testRebalanceDryRunSummary()
+ throws Exception {
+ // setup the rebalance config
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+
+ TableConfig tableConfig = getOfflineTableConfig();
+
+ // Ensure summary status is null if not enabled
+ RebalanceResult rebalanceResult = _tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+
+ // Enable summary, nothing is set
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP,
false, getNumServers(), getNumServers(),
+ tableConfig.getReplication());
+
+ // Add a new server (to force change in instance assignment) and enable
reassignInstances
+ BaseServerStarter serverStarter1 = startOneServer(NUM_SERVERS);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.DONE,
true, getNumServers(),
+ getNumServers() + 1, tableConfig.getReplication());
+
+ // Disable dry-run, summary still enabled
+ rebalanceConfig.setDryRun(false);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.FAILED);
+
+ // Disable summary along with dry-run to do a real rebalance
+ rebalanceConfig.setSummary(false);
+ rebalanceConfig.setDowntime(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+ waitForRebalanceToComplete(rebalanceResult, 600_000L);
+
+ // Untag the added server
+ _resourceManager.updateInstanceTags(serverStarter1.getInstanceId(), "",
false);
+
+ // Re-enable dry-run
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.DONE,
true, getNumServers() + 1,
+ getNumServers(), tableConfig.getReplication());
+
+ // Disable dry-run and summary to do a real rebalance
+ rebalanceConfig.setDryRun(false);
+ rebalanceConfig.setSummary(false);
+ rebalanceConfig.setDowntime(true);
+ rebalanceConfig.setReassignInstances(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertNull(rebalanceResult.getRebalanceSummaryResult());
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.DONE);
+
+ waitForRebalanceToComplete(rebalanceResult, 600_000L);
+
+ // Stop the server
+ serverStarter1.stop();
+ TestUtils.waitForCondition(aVoid ->
_resourceManager.dropInstance(serverStarter1.getInstanceId()).isSuccessful(),
+ 60_000L, "Failed to drop added server");
+
+ // Try dry-run with summary again
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setSummary(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP,
false, getNumServers(), getNumServers(),
+ tableConfig.getReplication());
+
+ // Try dry-run with summary and pre-checks
+ rebalanceConfig.setPreChecks(true);
+ rebalanceResult = _tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ checkRebalanceDryRunSummary(rebalanceResult, RebalanceResult.Status.NO_OP,
false, getNumServers(), getNumServers(),
+ tableConfig.getReplication());
+ assertNotNull(rebalanceResult.getPreChecksResult());
+
assertTrue(rebalanceResult.getPreChecksResult().containsKey(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS));
+
assertTrue(rebalanceResult.getPreChecksResult().containsKey(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT));
+
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.NEEDS_RELOAD_STATUS),
"false");
+
assertEquals(rebalanceResult.getPreChecksResult().get(DefaultRebalancePreChecker.IS_MINIMIZE_DATA_MOVEMENT),
+ "false");
+ }
+
+ private void checkRebalanceDryRunSummary(RebalanceResult rebalanceResult,
RebalanceResult.Status expectedStatus,
+ boolean isSegmentsToBeMoved, int existingNumServers, int newNumServers,
int replicationFactor) {
+ assertEquals(rebalanceResult.getStatus(), expectedStatus);
+ RebalanceSummaryResult summaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult);
+ assertNotNull(summaryResult.getServerInfo());
+ assertNotNull(summaryResult.getSegmentInfo());
+
assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(),
replicationFactor,
+ "Existing replication factor doesn't match expected");
+
assertEquals(summaryResult.getSegmentInfo().getReplicationFactor().getValueBeforeRebalance(),
+
summaryResult.getSegmentInfo().getReplicationFactor().getExpectedValueAfterRebalance(),
+ "Existing and new replication factor doesn't match");
+
assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
existingNumServers,
+ "Existing number of servers don't match");
+
assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
newNumServers,
+ "New number of servers don't match");
+ if (_tableSize > 0) {
+
assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes()
> 0L,
+ "Avg segment size expected to be > 0 but found to be 0");
+ }
+
assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(),
+ summaryResult.getServerInfo().getServersGettingNewSegments().size());
+ if (existingNumServers != newNumServers) {
+
assertTrue(summaryResult.getServerInfo().getNumServersGettingNewSegments() > 0,
+ "Expected number of servers should be > 0");
+ } else {
+
assertEquals(summaryResult.getServerInfo().getNumServersGettingNewSegments(), 0,
+ "Expected number of servers getting new segments should be 0");
+ }
+
+ if (isSegmentsToBeMoved) {
+ assertTrue(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved() >
0,
+ "Segments to be moved should be > 0");
+
assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(),
+ summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved()
+ *
summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes(),
+ "Estimated data to be moved in bytes doesn't match");
+
assertTrue(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer()
> 0,
+ "Estimated max number of segments to move in a single server should
be > 0");
+ } else {
+ assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0, "Segments to be moved should be 0");
+
assertEquals(summaryResult.getSegmentInfo().getTotalEstimatedDataToBeMovedInBytes(),
0L,
+ "Estimated data to be moved in bytes should be 0");
+
assertEquals(summaryResult.getSegmentInfo().getMaxSegmentsAddedToASingleServer(),
0,
+ "Estimated max number of segments to move in a single server should
be 0");
+ }
+
+ // Validate server status stats with numServers information
+ Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo>
serverSegmentChangeInfoMap =
+ summaryResult.getServerInfo().getServerSegmentChangeInfo();
+ int numServersAdded = 0;
+ int numServersRemoved = 0;
+ int numServersUnchanged = 0;
+ for (RebalanceSummaryResult.ServerSegmentChangeInfo
serverSegmentChangeInfo : serverSegmentChangeInfoMap.values()) {
+ switch (serverSegmentChangeInfo.getServerStatus()) {
+ case UNCHANGED:
+ numServersUnchanged++;
+ break;
+ case ADDED:
+ numServersAdded++;
+ break;
+ case REMOVED:
+ numServersRemoved++;
+ break;
+ default:
+ Assert.fail(String.format("Unknown server status encountered: %s",
+ serverSegmentChangeInfo.getServerStatus()));
+ break;
+ }
+ }
+
+
Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
+ numServersRemoved + numServersUnchanged);
+
Assert.assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
+ numServersAdded + numServersUnchanged);
+
+ assertEquals(numServersAdded,
summaryResult.getServerInfo().getServersAdded().size());
+ assertEquals(numServersRemoved,
summaryResult.getServerInfo().getServersRemoved().size());
+ assertEquals(numServersUnchanged,
summaryResult.getServerInfo().getServersUnchanged().size());
+ }
+
+ protected void waitForRebalanceToComplete(RebalanceResult rebalanceResult,
long timeoutMs) {
+ String jobId = rebalanceResult.getJobId();
+ if (rebalanceResult.getStatus() != RebalanceResult.Status.IN_PROGRESS) {
+ return;
+ }
+
+ TestUtils.waitForCondition(aVoid -> {
+ try {
+ String requestUrl =
getControllerRequestURLBuilder().forTableRebalanceStatus(jobId);
+ try {
+ SimpleHttpResponse httpResponse =
+
HttpClient.wrapAndThrowHttpException(getHttpClient().sendGetRequest(new
URL(requestUrl).toURI(), null));
+
+ ServerRebalanceJobStatusResponse serverRebalanceJobStatusResponse =
+ JsonUtils.stringToObject(httpResponse.getResponse(),
ServerRebalanceJobStatusResponse.class);
+ RebalanceResult.Status status =
serverRebalanceJobStatusResponse.getTableRebalanceProgressStats().getStatus();
+ return status != RebalanceResult.Status.IN_PROGRESS;
+ } catch (HttpErrorStatusException | URISyntaxException e) {
+ throw new IOException(e);
+ }
+ } catch (Exception e) {
+ return null;
+ }
+ }, 1000L, timeoutMs, "Failed to load all segments after rebalance");
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]