This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 0075ad7d53 Add Tenants Info to Rebalance API summary (#15284)
0075ad7d53 is described below
commit 0075ad7d5346bb3c2b74aeca896f47007a9f87fc
Author: Jhow <[email protected]>
AuthorDate: Fri Mar 28 14:57:24 2025 -0700
Add Tenants Info to Rebalance API summary (#15284)
---
.../core/rebalance/RebalanceSummaryResult.java | 68 ++++++-
.../helix/core/rebalance/TableRebalancer.java | 70 ++++++-
.../RebalanceServerRebalanceSummaryResponse.tsx | 4 +
.../TableRebalancerClusterStatelessTest.java | 202 +++++++++++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 14 ++
5 files changed, 354 insertions(+), 4 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index 3169c9b367..753d3d5dd4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -39,6 +39,8 @@ public class RebalanceSummaryResult {
private final ServerInfo _serverInfo;
@JsonInclude(JsonInclude.Include.NON_NULL)
private final SegmentInfo _segmentInfo;
+ @JsonInclude(JsonInclude.Include.NON_NULL)
+ private final List<TagInfo> _tagsInfo;
/**
* Constructor for RebalanceSummaryResult
@@ -47,9 +49,11 @@ public class RebalanceSummaryResult {
*/
@JsonCreator
public RebalanceSummaryResult(@JsonProperty("serverInfo") @Nullable
ServerInfo serverInfo,
- @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo) {
+ @JsonProperty("segmentInfo") @Nullable SegmentInfo segmentInfo,
+ @JsonProperty("tagsInfo") @Nullable List<TagInfo> tagsInfo) {
_serverInfo = serverInfo;
_segmentInfo = segmentInfo;
+ _tagsInfo = tagsInfo;
}
@JsonProperty
@@ -62,6 +66,11 @@ public class RebalanceSummaryResult {
return _segmentInfo;
}
+ @JsonProperty
+ public List<TagInfo> getTagsInfo() {
+ return _tagsInfo;
+ }
+
public static class ServerSegmentChangeInfo {
private final ServerStatus _serverStatus;
private final int _totalSegmentsAfterRebalance;
@@ -161,6 +170,63 @@ public class RebalanceSummaryResult {
}
}
+ public static class TagInfo {
+ public static final String TAG_FOR_OUTDATED_SERVERS = "OUTDATED_SERVERS";
+ private final String _tagName;
+ private int _numSegmentsUnchanged;
+ private int _numSegmentsToDownload;
+ private int _numServerParticipants;
+
+ @JsonCreator
+ public TagInfo(
+ @JsonProperty("tagName") String tagName,
+ @JsonProperty("numSegmentsToDownload") int numSegmentsToDownload,
+ @JsonProperty("numSegmentsUnchanged") int numSegmentsUnchanged,
+ @JsonProperty("numServerParticipants") int numServerParticipants
+ ) {
+ _tagName = tagName;
+ _numSegmentsUnchanged = numSegmentsUnchanged;
+ _numSegmentsToDownload = numSegmentsToDownload;
+ _numServerParticipants = numServerParticipants;
+ }
+
+ public TagInfo(String tagName) {
+ this(tagName, 0, 0, 0);
+ }
+
+ @JsonProperty
+ public String getTagName() {
+ return _tagName;
+ }
+
+ @JsonProperty
+ public int getNumSegmentsUnchanged() {
+ return _numSegmentsUnchanged;
+ }
+
+ @JsonProperty
+ public int getNumSegmentsToDownload() {
+ return _numSegmentsToDownload;
+ }
+
+ @JsonProperty
+ public int getNumServerParticipants() {
+ return _numServerParticipants;
+ }
+
+ public void increaseNumSegmentsUnchanged(int numSegments) {
+ _numSegmentsUnchanged += numSegments;
+ }
+
+ public void increaseNumSegmentsToDownload(int numSegments) {
+ _numSegmentsToDownload += numSegments;
+ }
+
+ public void increaseNumServerParticipants(int numServers) {
+ _numServerParticipants += numServers;
+ }
+ }
+
public static class ServerInfo {
private final int _numServersGettingNewSegments;
@JsonInclude(JsonInclude.Include.NON_NULL)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index d91e2db103..3e7dc3eab3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -56,6 +56,7 @@ import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
import org.apache.pinot.common.utils.config.TableConfigUtils;
+import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
import
org.apache.pinot.controller.helix.core.assignment.instance.InstanceAssignmentDriver;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignment;
@@ -66,6 +67,7 @@ import org.apache.pinot.controller.util.TableSizeReader;
import org.apache.pinot.spi.config.table.RoutingConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
@@ -334,7 +336,7 @@ public class TableRebalancer {
// is expected or not based on the summary results
RebalanceSummaryResult summaryResult =
calculateDryRunSummary(currentAssignment, targetAssignment,
tableNameWithType, rebalanceJobId,
- tableSubTypeSizeDetails);
+ tableSubTypeSizeDetails, tableConfig);
if (segmentAssignmentUnchanged) {
LOGGER.info("Table: {} is already balanced", tableNameWithType);
@@ -622,7 +624,7 @@ public class TableRebalancer {
private RebalanceSummaryResult calculateDryRunSummary(Map<String,
Map<String, String>> currentAssignment,
Map<String, Map<String, String>> targetAssignment, String
tableNameWithType, String rebalanceJobId,
- TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails) {
+ TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails,
TableConfig tableConfig) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
int existingReplicationFactor = 0;
@@ -662,6 +664,38 @@ public class TableRebalancer {
Set<String> serversRemoved = new HashSet<>();
Set<String> serversUnchanged = new HashSet<>();
Set<String> serversGettingNewSegments = new HashSet<>();
+ Map<String, RebalanceSummaryResult.TagInfo> tagsInfoMap = new HashMap<>();
+ String serverTenantName = tableConfig.getTenantConfig().getServer();
+ if (serverTenantName != null) {
+ String serverTenantTag =
+ TagNameUtils.getServerTagForTenant(serverTenantName,
tableConfig.getTableType());
+ tagsInfoMap.put(serverTenantTag,
+ new RebalanceSummaryResult.TagInfo(serverTenantTag));
+ }
+ TagOverrideConfig tagOverrideConfig =
tableConfig.getTenantConfig().getTagOverrideConfig();
+ if (tagOverrideConfig != null) {
+ String completedTag = tagOverrideConfig.getRealtimeCompleted();
+ String consumingTag = tagOverrideConfig.getRealtimeConsuming();
+ if (completedTag != null) {
+ tagsInfoMap.put(completedTag, new
RebalanceSummaryResult.TagInfo(completedTag));
+ }
+ if (consumingTag != null) {
+ tagsInfoMap.put(consumingTag, new
RebalanceSummaryResult.TagInfo(consumingTag));
+ }
+ }
+ if (tableConfig.getInstanceAssignmentConfigMap() != null) {
+ // for simplicity, including all segment types present in
instanceAssignmentConfigMap
+
tableConfig.getInstanceAssignmentConfigMap().values().forEach(instanceAssignmentConfig
-> {
+ String tag = instanceAssignmentConfig.getTagPoolConfig().getTag();
+ tagsInfoMap.put(tag, new RebalanceSummaryResult.TagInfo(tag));
+ });
+ }
+ if (tableConfig.getTierConfigsList() != null) {
+ tableConfig.getTierConfigsList().forEach(tierConfig -> {
+ String tierTag = tierConfig.getServerTag();
+ tagsInfoMap.put(tierTag, new RebalanceSummaryResult.TagInfo(tierTag));
+ });
+ }
Map<String, RebalanceSummaryResult.ServerSegmentChangeInfo>
serverSegmentChangeInfoMap = new HashMap<>();
int segmentsNotMoved = 0;
int maxSegmentsAddedToServer = 0;
@@ -699,6 +733,30 @@ public class TableRebalancer {
serverSegmentChangeInfoMap.put(server, new
RebalanceSummaryResult.ServerSegmentChangeInfo(serverStatus,
totalNewSegments, totalExistingSegments, segmentsAdded,
segmentsDeleted, segmentsUnchanged,
instanceToTagsMap.getOrDefault(server, null)));
+ List<String> serverTags = getServerTag(server);
+ Set<String> relevantTags = new HashSet<>(serverTags);
+ relevantTags.retainAll(tagsInfoMap.keySet());
+ // The segments remain unchanged or need to download will be accounted
to every tag associated with this
+ // server instance
+ if (relevantTags.isEmpty()) {
+ // this could happen when server's tags changed but
reassignInstance=false in the rebalance config
+ LOGGER.warn("Server: {} was assigned to table: {} but does not have
any relevant tags", server,
+ tableNameWithType);
+
+ RebalanceSummaryResult.TagInfo tagsInfo =
+
tagsInfoMap.computeIfAbsent(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS,
+ RebalanceSummaryResult.TagInfo::new);
+ tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
+ tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
+ tagsInfo.increaseNumServerParticipants(1);
+ } else {
+ for (String tag : relevantTags) {
+ RebalanceSummaryResult.TagInfo tagsInfo = tagsInfoMap.get(tag);
+ tagsInfo.increaseNumSegmentsUnchanged(segmentsUnchanged);
+ tagsInfo.increaseNumSegmentsToDownload(segmentsAdded);
+ tagsInfo.increaseNumServerParticipants(1);
+ }
+ }
}
for (Map.Entry<String, Set<String>> entry :
existingServersToSegmentMap.entrySet()) {
@@ -739,7 +797,13 @@ public class TableRebalancer {
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
- return new RebalanceSummaryResult(serverInfo, segmentInfo);
+ return new RebalanceSummaryResult(serverInfo, segmentInfo, new
ArrayList<>(tagsInfoMap.values()));
+ }
+
+ private List<String> getServerTag(String serverName) {
+ InstanceConfig instanceConfig =
+
_helixDataAccessor.getProperty(_helixDataAccessor.keyBuilder().instanceConfig(serverName));
+ return instanceConfig.getTags();
}
private void onReturnFailure(String errorMsg, Exception e) {
diff --git
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
index ba79b18167..7744058237 100644
---
a/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
+++
b/pinot-controller/src/main/resources/app/components/Homepage/Operations/RebalanceServer/RebalanceServerResponses/RebalanceServerRebalanceSummaryResponse.tsx
@@ -32,6 +32,10 @@ export const RebalanceServerRebalanceSummaryResponse = ({
response }) => {
{
name: 'II. Segment Information',
key: 'segmentInfo'
+ },
+ {
+ name: 'III. Server Tags Information',
+ key: 'tagsInfo'
}
];
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index e86c8263f0..1a8c221899 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.restlet.resources.DiskUsageInfo;
@@ -153,6 +154,13 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -202,6 +210,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
3);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
14);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS - 14);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -371,6 +388,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
11);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
11);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS - 11);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -451,6 +477,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -476,6 +511,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers + numServersToAdd);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -522,6 +566,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
3);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(null));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
15);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS - 15);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -826,6 +879,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -858,6 +920,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -892,6 +963,27 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
9);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
6);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 3);
+ Map<String, RebalanceSummaryResult.TagInfo> tenantInfoMap =
rebalanceSummaryResult.getTagsInfo()
+ .stream()
+ .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName,
info -> info));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)));
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumSegmentsToDownload(),
0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumSegmentsUnchanged(),
+ 5 * NUM_REPLICAS);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(NO_TIER_NAME)).getNumServerParticipants(),
+ numServers);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumSegmentsToDownload(),
+ NUM_REPLICAS);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumSegmentsUnchanged(),
0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_A_NAME)).getNumServerParticipants(),
3);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumSegmentsToDownload(),
+ 4 * NUM_REPLICAS);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumSegmentsUnchanged(),
0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant(TIER_B_NAME)).getNumServerParticipants(),
3);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getTierInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -969,6 +1061,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant("replicaAssignment" +
NO_TIER_NAME));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -997,6 +1098,15 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
0);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 1);
+ assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant("replicaAssignment" +
NO_TIER_NAME));
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsToDownload(),
0);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(rebalanceSummaryResult.getTagsInfo().get(0).getNumServerParticipants(),
+ numServers);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1025,6 +1135,28 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServersGettingNewSegments(),
6);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 2);
+ Map<String, RebalanceSummaryResult.TagInfo> tenantInfoMap =
rebalanceSummaryResult.getTagsInfo()
+ .stream()
+ .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName,
info -> info));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME)));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME)));
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumSegmentsToDownload(), 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumSegmentsUnchanged(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumServerParticipants(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumSegmentsToDownload(),
+ numSegments * NUM_REPLICAS);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumSegmentsUnchanged(), 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumServerParticipants(), 6);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getTierInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1064,6 +1196,28 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
13);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 2);
+ tenantInfoMap = rebalanceSummaryResult.getTagsInfo()
+ .stream()
+ .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName,
info -> info));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME)));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME)));
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumSegmentsToDownload(), 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumSegmentsUnchanged(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumServerParticipants(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumSegmentsToDownload(),
+ 13);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumSegmentsUnchanged(), numSegments * NUM_REPLICAS - 13);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumServerParticipants(), 6);
assertNotNull(rebalanceResult.getInstanceAssignment());
assertNotNull(rebalanceResult.getTierInstanceAssignment());
assertNotNull(rebalanceResult.getSegmentAssignment());
@@ -1115,6 +1269,54 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
}
assertEquals(numSegmentsOnServer0, numSegments / 2);
+ _helixResourceManager.deleteOfflineServerTenantFor("replicaAssignment" +
TIER_A_NAME);
+ rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setReassignInstances(false);
+
+ // if rebalance with reassignInstances=false, servers assigned would not
have relevant tags
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ assertEquals(rebalanceResult.getStatus(), RebalanceResult.Status.NO_OP);
+ rebalanceSummaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(rebalanceSummaryResult);
+ assertNotNull(rebalanceSummaryResult.getServerInfo());
+ assertNotNull(rebalanceSummaryResult.getSegmentInfo());
+
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
6);
+
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
6);
+ assertNotNull(rebalanceSummaryResult.getTagsInfo());
+ assertEquals(rebalanceSummaryResult.getTagsInfo().size(), 3);
+ tenantInfoMap = rebalanceSummaryResult.getTagsInfo()
+ .stream()
+ .collect(Collectors.toMap(RebalanceSummaryResult.TagInfo::getTagName,
info -> info));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME)));
+
assertTrue(tenantInfoMap.containsKey(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME)));
+
assertTrue(tenantInfoMap.containsKey(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS));
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumSegmentsToDownload(), 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumSegmentsUnchanged(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ NO_TIER_NAME))
+ .getNumServerParticipants(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumSegmentsToDownload(),
+ 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumSegmentsUnchanged(), 0);
+
assertEquals(tenantInfoMap.get(TagNameUtils.getOfflineTagForTenant("replicaAssignment"
+ TIER_A_NAME))
+ .getNumServerParticipants(), 0);
+ assertEquals(
+
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsToDownload(),
+ 0);
+ assertEquals(
+
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumSegmentsUnchanged(),
+ numSegments * NUM_REPLICAS);
+ assertEquals(
+
tenantInfoMap.get(RebalanceSummaryResult.TagInfo.TAG_FOR_OUTDATED_SERVERS).getNumServerParticipants(),
+ 6);
+
_helixResourceManager.deleteOfflineTable(TIERED_TABLE_NAME);
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index f4eb559f68..eb575799e3 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -65,6 +65,7 @@ import
org.apache.pinot.common.response.server.TableIndexMetadataResponse;
import org.apache.pinot.common.utils.FileUploadDownloadClient;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.SimpleHttpResponse;
+import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.http.HttpClient;
import
org.apache.pinot.controller.api.resources.ServerRebalanceJobStatusResponse;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
@@ -4294,6 +4295,19 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
"Existing number of servers don't match");
assertEquals(summaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
newNumServers,
"New number of servers don't match");
+ // In this cluster integration test, servers are tagged with DefaultTenant
only
+ assertEquals(summaryResult.getTagsInfo().size(), 1);
+ assertEquals(summaryResult.getTagsInfo().get(0).getTagName(),
+ TagNameUtils.getOfflineTagForTenant(getServerTenant()));
+
assertEquals(summaryResult.getTagsInfo().get(0).getNumServerParticipants(),
newNumServers);
+ assertEquals(summaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
+ summaryResult.getTagsInfo().get(0).getNumSegmentsToDownload());
+ // For this single tenant, the number of unchanged segments and the number
of received segments should add up to
+ // the total present segment
+
assertEquals(summaryResult.getSegmentInfo().getNumSegmentsAcrossAllReplicas().getExpectedValueAfterRebalance(),
+ summaryResult.getTagsInfo().get(0).getNumSegmentsUnchanged() +
summaryResult.getTagsInfo()
+ .get(0)
+ .getNumSegmentsToDownload());
if (_tableSize > 0) {
assertTrue(summaryResult.getSegmentInfo().getEstimatedAverageSegmentSizeInBytes()
> 0L,
"Avg segment size expected to be > 0 but found to be 0");
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]