This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 38ad8fe44e Consuming Rebalance Summary (#15368)
38ad8fe44e is described below
commit 38ad8fe44ead669f35245d3bbf53b6555cb9df49
Author: Jhow <[email protected]>
AuthorDate: Mon Apr 7 10:32:33 2025 -0700
Consuming Rebalance Summary (#15368)
* Add consumingSegmentsToBeMoved to SegmentInfo
* test consumingSegmentToBeMoved
* Add consuming segment summary
* lint
* register connection manager instead of assign in constructor
* add integration test
* lint
* license
* fix JsonInclude
* rename byte to offset in the context of Kafka stream offset
* add segment age into consuming segment summary and reorg the info
* update tests
* docs: log msg and comment
* Update format
* variables renaming
* naming
* fix JsonInclude annotation
* lint
* variable naming
* variable naming
* lint: style
* handle illegal offset format return from stream
* move JsonInclude annotators to class level
* show null and -1 instead of drop the field in
ConsumingSegmentToBeMovedSummary
* docs: comment update
* lint: import
* docs: remove ambiguity
* update consuming segment logic
update import
* trigger CI
* pass consumingSegmentInfoReader into PinotHelixResourceManager
* variable renamed and bug fix
* remove extra constructor
* create ConsumingSegmentInfoReader in OfflineClusterIntegrationTest
* Use StreamMetadataProvider instead of ConsumingSegmentInfoReader
* remove resource
* update test
* docs: add detail for consumingSegmentsToBeMovedWithOldestAgeInMinutes
* docs:
---
.../core/rebalance/RebalanceSummaryResult.java | 142 +++++++++--
.../helix/core/rebalance/TableRebalancer.java | 230 +++++++++++++++++-
.../TableRebalancerClusterStatelessTest.java | 259 ++++++++++++++++++++-
.../impl/fakestream/FakeStreamConfigUtils.java | 2 +-
...mingSegmentToBeMovedSummaryIntegrationTest.java | 175 ++++++++++++++
.../tests/OfflineClusterIntegrationTest.java | 3 +
6 files changed, 782 insertions(+), 29 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index 753d3d5dd4..82976da5d8 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -34,12 +34,8 @@ import javax.annotation.Nullable;
@JsonIgnoreProperties(ignoreUnknown = true)
@JsonInclude(JsonInclude.Include.NON_NULL)
public class RebalanceSummaryResult {
-
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final ServerInfo _serverInfo;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final SegmentInfo _segmentInfo;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final List<TagInfo> _tagsInfo;
/**
@@ -71,6 +67,7 @@ public class RebalanceSummaryResult {
return _tagsInfo;
}
+ @JsonInclude(JsonInclude.Include.NON_NULL)
public static class ServerSegmentChangeInfo {
private final ServerStatus _serverStatus;
private final int _totalSegmentsAfterRebalance;
@@ -78,7 +75,6 @@ public class RebalanceSummaryResult {
private final int _segmentsAdded;
private final int _segmentsDeleted;
private final int _segmentsUnchanged;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final List<String> _tagList;
/**
@@ -227,19 +223,14 @@ public class RebalanceSummaryResult {
}
}
+ @JsonInclude(JsonInclude.Include.NON_NULL)
public static class ServerInfo {
private final int _numServersGettingNewSegments;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final RebalanceChangeInfo _numServers;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final Set<String> _serversAdded;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final Set<String> _serversRemoved;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final Set<String> _serversUnchanged;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final Set<String> _serversGettingNewSegments;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final Map<String, ServerSegmentChangeInfo>
_serverSegmentChangeInfo;
/**
@@ -306,18 +297,130 @@ public class RebalanceSummaryResult {
}
}
+ public static class ConsumingSegmentToBeMovedSummary {
+ private final int _numConsumingSegmentsToBeMoved;
+ private final int _numServersGettingConsumingSegmentsAdded;
+ private final Map<String, Integer>
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+ private final Map<String, Integer>
_consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+ private final Map<String, ConsumingSegmentSummaryPerServer>
_serverConsumingSegmentSummary;
+
+ /**
+ * Constructor for ConsumingSegmentToBeMovedSummary
+ * @param numConsumingSegmentsToBeMoved total number of consuming segments
to be moved as part of this rebalance
+ * @param numServersGettingConsumingSegmentsAdded maximum bytes of
consuming segments to be moved to catch up
+ * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming
segments to be moved to catch up.
+ * Map from
segment name to its number of offsets to
+ * catch up on
the new server. This is essentially the
+ * difference
between the latest offset of the stream
+ * and the
segment's start offset of the stream. The
+ * map is set to
null if the number of offsets to catch
+ * up could not
be determined for at least one
+ * consuming
segment
+ * @param consumingSegmentsToBeMovedWithOldestAgeInMinutes oldest
consuming segments to be moved to catch up. Map
+ * from segment
name to its age in minutes. The map is
+ * set to null if
ZK metadata is not available or the
+ * creation time
is not found for at least one consuming
+ * segment.
+ * The age of a
segment is determined by its creation
+ * time from ZK
metadata. Segment age is an approximation
+ * to data age for
a consuming segment. It may not reflect
+ * the actual
oldest age of data in the consuming segment.
+ * For reasons, a
segment could consume events which date
+ * before the
segment created. We collect segment age
+ * here as there
is no obvious way to get the age of the
+ * oldest data in
the stream for a specific consuming
+ * segment
+ * @param serverConsumingSegmentSummary ConsumingSegmentSummaryPerServer
per server
+ */
+ @JsonCreator
+ public ConsumingSegmentToBeMovedSummary(
+ @JsonProperty("numConsumingSegmentsToBeMoved") int
numConsumingSegmentsToBeMoved,
+ @JsonProperty("numServersGettingConsumingSegmentsAdded") int
numServersGettingConsumingSegmentsAdded,
+ @JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp")
@Nullable
+ Map<String, Integer>
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+ @JsonProperty("consumingSegmentsToBeMovedWithOldestAgeInMinutes")
@Nullable
+ Map<String, Integer> consumingSegmentsToBeMovedWithOldestAgeInMinutes,
+ @JsonProperty("serverConsumingSegmentSummary") @Nullable
+ Map<String, ConsumingSegmentSummaryPerServer>
serverConsumingSegmentSummary) {
+ _numConsumingSegmentsToBeMoved = numConsumingSegmentsToBeMoved;
+ _numServersGettingConsumingSegmentsAdded =
numServersGettingConsumingSegmentsAdded;
+ _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp =
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+ _consumingSegmentsToBeMovedWithOldestAgeInMinutes =
consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+ _serverConsumingSegmentSummary = serverConsumingSegmentSummary;
+ }
+
+ @JsonProperty
+ public int getNumConsumingSegmentsToBeMoved() {
+ return _numConsumingSegmentsToBeMoved;
+ }
+
+ @JsonProperty
+ public int getNumServersGettingConsumingSegmentsAdded() {
+ return _numServersGettingConsumingSegmentsAdded;
+ }
+
+ @JsonProperty
+ public Map<String, Integer>
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
+ return _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+ }
+
+ @JsonProperty
+ public Map<String, Integer>
getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() {
+ return _consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+ }
+
+ @JsonProperty
+ public Map<String, ConsumingSegmentSummaryPerServer>
getServerConsumingSegmentSummary() {
+ return _serverConsumingSegmentSummary;
+ }
+
+ public static class ConsumingSegmentSummaryPerServer {
+ private final int _numConsumingSegmentsToBeAdded;
+ private final int _totalOffsetsToCatchUpAcrossAllConsumingSegments;
+
+ /**
+ * Constructor for ConsumingSegmentSummaryPerServer
+ * @param numConsumingSegmentsToBeAdded number of consuming segments to
be added to this server
+ * @param totalOffsetsToCatchUpAcrossAllConsumingSegments total number
of offsets to catch up across all consuming
+ * segments. The
number of offsets to catch up for a
+ * consuming
segment is essentially the difference
+ * between the
latest offset of the stream and the
+ * segment's
start offset of the stream. Set to -1 if
+ * the offsets
to catch up could not be determined for
+ * at least one
consuming segment
+ */
+ @JsonCreator
+ public ConsumingSegmentSummaryPerServer(
+ @JsonProperty("numConsumingSegmentsToBeAdded") int
numConsumingSegmentsToBeAdded,
+ @JsonProperty("totalOffsetsToCatchUpAcrossAllConsumingSegments")
+ int totalOffsetsToCatchUpAcrossAllConsumingSegments) {
+ _numConsumingSegmentsToBeAdded = numConsumingSegmentsToBeAdded;
+ _totalOffsetsToCatchUpAcrossAllConsumingSegments =
totalOffsetsToCatchUpAcrossAllConsumingSegments;
+ }
+
+ @JsonProperty
+ public int getNumConsumingSegmentsToBeAdded() {
+ return _numConsumingSegmentsToBeAdded;
+ }
+
+ @JsonProperty
+ public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
+ return _totalOffsetsToCatchUpAcrossAllConsumingSegments;
+ }
+ }
+ }
+
+ @JsonInclude(JsonInclude.Include.NON_NULL)
public static class SegmentInfo {
// TODO: Add a metric to estimate the total time it will take to rebalance
private final int _totalSegmentsToBeMoved;
private final int _maxSegmentsAddedToASingleServer;
private final long _estimatedAverageSegmentSizeInBytes;
private final long _totalEstimatedDataToBeMovedInBytes;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final RebalanceChangeInfo _replicationFactor;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final RebalanceChangeInfo _numSegmentsInSingleReplica;
- @JsonInclude(JsonInclude.Include.NON_NULL)
private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas;
+ private final ConsumingSegmentToBeMovedSummary
_consumingSegmentToBeMovedSummary;
/**
* Constructor for SegmentInfo
@@ -328,6 +431,7 @@ public class RebalanceSummaryResult {
* @param replicationFactor replication factor before and after this
rebalance
* @param numSegmentsInSingleReplica number of segments in single replica
before and after this rebalance
* @param numSegmentsAcrossAllReplicas total number of segments across all
replicas before and after this rebalance
+ * @param consumingSegmentToBeMovedSummary consuming segment summary. Set
to null if the table is an offline table
*/
@JsonCreator
public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int
totalSegmentsToBeMoved,
@@ -336,7 +440,9 @@ public class RebalanceSummaryResult {
@JsonProperty("totalEstimatedDataToBeMovedInBytes") long
totalEstimatedDataToBeMovedInBytes,
@JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo
replicationFactor,
@JsonProperty("numSegmentsInSingleReplica") @Nullable
RebalanceChangeInfo numSegmentsInSingleReplica,
- @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable
RebalanceChangeInfo numSegmentsAcrossAllReplicas) {
+ @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable
RebalanceChangeInfo numSegmentsAcrossAllReplicas,
+ @JsonProperty("consumingSegmentToBeMovedSummary") @Nullable
+ ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary) {
_totalSegmentsToBeMoved = totalSegmentsToBeMoved;
_maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
_estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
@@ -344,6 +450,7 @@ public class RebalanceSummaryResult {
_replicationFactor = replicationFactor;
_numSegmentsInSingleReplica = numSegmentsInSingleReplica;
_numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas;
+ _consumingSegmentToBeMovedSummary = consumingSegmentToBeMovedSummary;
}
@JsonProperty
@@ -380,6 +487,11 @@ public class RebalanceSummaryResult {
public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
return _numSegmentsAcrossAllReplicas;
}
+
+ @JsonProperty
+ public ConsumingSegmentToBeMovedSummary
getConsumingSegmentToBeMovedSummary() {
+ return _consumingSegmentToBeMovedSummary;
+ }
}
public enum ServerStatus {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index f26a7b8060..d29f2091b4 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,9 +21,12 @@ package org.apache.pinot.controller.helix.core.rebalance;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -33,6 +36,7 @@ import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.tuple.Pair;
@@ -50,11 +54,14 @@ import
org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.common.tier.PinotServerTierStorage;
import org.apache.pinot.common.tier.Tier;
import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.common.utils.SegmentUtils;
import org.apache.pinot.common.utils.config.TableConfigUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.config.TierConfigUtils;
@@ -71,7 +78,16 @@ import org.apache.pinot.spi.config.table.TagOverrideConfig;
import org.apache.pinot.spi.config.table.TierConfig;
import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -120,6 +136,10 @@ import org.slf4j.LoggerFactory;
*/
public class TableRebalancer {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableRebalancer.class);
+ private static final int TOP_N_IN_CONSUMING_SEGMENT_SUMMARY = 10;
+ // TODO: Consider making the timeoutMs below table rebalancer configurable
+ private static final int TABLE_SIZE_READER_TIMEOUT_MS = 30_000;
+ private static final int STREAM_PARTITION_OFFSET_READ_TIMEOUT_MS = 10_000;
private final HelixManager _helixManager;
private final HelixDataAccessor _helixDataAccessor;
private final TableRebalanceObserver _tableRebalanceObserver;
@@ -618,9 +638,8 @@ public class TableRebalancer {
}
LOGGER.info("Fetching the table size for table: {}", tableNameWithType);
try {
- // TODO: Consider making the timeoutMs for fetching table size via table
rebalancer configurable
TableSizeReader.TableSubTypeSizeDetails sizeDetails =
- _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+ _tableSizeReader.getTableSubtypeSize(tableNameWithType,
TABLE_SIZE_READER_TIMEOUT_MS);
LOGGER.info("Fetched the table size details for table: {}",
tableNameWithType);
return sizeDetails;
} catch (InvalidConfigException e) {
@@ -638,22 +657,42 @@ public class TableRebalancer {
TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails,
TableConfig tableConfig) {
LOGGER.info("Calculating rebalance summary for table: {} with
rebalanceJobId: {}",
tableNameWithType, rebalanceJobId);
+ boolean isOfflineTable =
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) ==
TableType.OFFLINE;
int existingReplicationFactor = 0;
int newReplicationFactor = 0;
Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+ Map<String, Set<String>> existingServersToConsumingSegmentMap =
isOfflineTable ? null : new HashMap<>();
+ Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable
? null : new HashMap<>();
for (Map.Entry<String, Map<String, String>> entrySet :
currentAssignment.entrySet()) {
existingReplicationFactor = entrySet.getValue().size();
- for (String segmentKey : entrySet.getValue().keySet()) {
- existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ String segmentName = entrySet.getKey();
+ Collection<String> segmentStates = entrySet.getValue().values();
+ boolean isSegmentConsuming = existingServersToConsumingSegmentMap !=
null && segmentStates.stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
segmentStates.stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
+
+ for (String instanceName : entrySet.getValue().keySet()) {
+ existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new
HashSet<>()).add(segmentName);
+ if (isSegmentConsuming) {
+ existingServersToConsumingSegmentMap.computeIfAbsent(instanceName, k
-> new HashSet<>()).add(segmentName);
+ }
}
}
for (Map.Entry<String, Map<String, String>> entrySet :
targetAssignment.entrySet()) {
newReplicationFactor = entrySet.getValue().size();
- for (String segmentKey : entrySet.getValue().keySet()) {
- newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new
HashSet<>()).add(entrySet.getKey());
+ String segmentName = entrySet.getKey();
+ Collection<String> segmentStates = entrySet.getValue().values();
+ boolean isSegmentConsuming = existingServersToConsumingSegmentMap !=
null && segmentStates.stream()
+ .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) &&
segmentStates.stream()
+ .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
+ for (String instanceName : entrySet.getValue().keySet()) {
+ newServersToSegmentMap.computeIfAbsent(instanceName, k -> new
HashSet<>()).add(segmentName);
+ if (isSegmentConsuming) {
+ newServersToConsumingSegmentMap.computeIfAbsent(instanceName, k ->
new HashSet<>()).add(segmentName);
+ }
}
}
RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
@@ -780,6 +819,15 @@ public class TableRebalancer {
}
}
+ if (existingServersToConsumingSegmentMap != null &&
newServersToConsumingSegmentMap != null) {
+ // turn the map into {server: added consuming segments}
+ for (Map.Entry<String, Set<String>> entry :
newServersToConsumingSegmentMap.entrySet()) {
+ String server = entry.getKey();
+
entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server,
Collections.emptySet()));
+ }
+ newServersToConsumingSegmentMap.entrySet().removeIf(entry ->
entry.getValue().isEmpty());
+ }
+
RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
= new
RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(),
targetAssignment.size());
@@ -802,9 +850,11 @@ public class TableRebalancer {
serversGettingNewSegments, serverSegmentChangeInfoMap);
// TODO: Add a metric to estimate the total time it will take to
rebalance. Need some good heuristics on how
// rebalance time can vary with number of segments added
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ isOfflineTable ? null : getConsumingSegmentSummary(tableConfig,
newServersToConsumingSegmentMap);
RebalanceSummaryResult.SegmentInfo segmentInfo = new
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
maxSegmentsAddedToServer, averageSegmentSizeInBytes,
totalEstimatedDataToBeMovedInBytes,
- replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas);
+ replicationFactor, numSegmentsInSingleReplica,
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
LOGGER.info("Calculated rebalance summary for table: {} with
rebalanceJobId: {}", tableNameWithType,
rebalanceJobId);
@@ -817,6 +867,172 @@ public class TableRebalancer {
return instanceConfig.getTags();
}
+ private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
getConsumingSegmentSummary(TableConfig tableConfig,
+ Map<String, Set<String>> newServersToConsumingSegmentMap) {
+ String tableNameWithType = tableConfig.getTableName();
+ if (newServersToConsumingSegmentMap.isEmpty()) {
+ return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0,
new HashMap<>(), new HashMap<>(),
+ new HashMap<>());
+ }
+ int numConsumingSegmentsToBeMoved =
+ newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) ->
a + b.size(), Integer::sum);
+ Set<String> uniqueConsumingSegments =
+
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+ Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new
HashMap<>();
+ uniqueConsumingSegments.forEach(segment ->
consumingSegmentZKmetadata.put(segment,
+
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(),
tableNameWithType, segment)));
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+ getConsumingSegmentsOffsetsToCatchUp(tableConfig,
consumingSegmentZKmetadata);
+ Map<String, Integer> consumingSegmentsAge =
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+ Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+ Map<String,
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+ consumingSegmentSummaryPerServer = new HashMap<>();
+ if (consumingSegmentsOffsetsToCatchUp != null) {
+ consumingSegmentsOffsetsToCatchUpTopN =
+ getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp,
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ int totalOffsetsToCatchUp =
+
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+ consumingSegmentSummaryPerServer.put(server,
+ new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+ segments.size(), totalOffsetsToCatchUp));
+ });
+ } else {
+ consumingSegmentsOffsetsToCatchUpTopN = null;
+ newServersToConsumingSegmentMap.forEach((server, segments) -> {
+ consumingSegmentSummaryPerServer.put(server,
+ new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+ segments.size(), -1));
+ });
+ }
+
+ Map<String, Integer> consumingSegmentsOldestTopN =
+ consumingSegmentsAge == null ? null
+ : getTopNConsumingSegmentWithValue(consumingSegmentsAge,
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
+
+ return new
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+ newServersToConsumingSegmentMap.size(),
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+ consumingSegmentSummaryPerServer);
+ }
+
+ private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+ Map<String, Integer> consumingSegmentsWithValue, @Nullable Integer topN)
{
+ Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+ consumingSegmentsWithValue.entrySet()
+ .stream()
+ .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+ .limit(topN == null ? consumingSegmentsWithValue.size() : topN)
+ .forEach(entry -> topNConsumingSegments.put(entry.getKey(),
entry.getValue()));
+ return topNConsumingSegments;
+ }
+
+ /**
+ * Fetches the age of each consuming segment in minutes.
+ * The age of a consuming segment is the time since the segment was created
in ZK, it could be different to when
+ * the stream should start to be consumed for the segment.
+ * consumingSegmentZKMetadata is a map from consuming segments to be moved
to their ZK metadata. Returns a map from
+ * segment name to the age of that consuming segment. Return null if failed
to obtain info for any consuming segment.
+ */
+ @Nullable
+ private Map<String, Integer> getConsumingSegmentsAge(String
tableNameWithType,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+ long now = System.currentTimeMillis();
+ try {
+ consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+ if (segmentZKMetadata == null) {
+ LOGGER.warn("SegmentZKMetadata is null for segment: {} in table:
{}", s, tableNameWithType);
+ throw new RuntimeException("SegmentZKMetadata is null");
+ }
+ long creationTime = segmentZKMetadata.getCreationTime();
+ if (creationTime < 0) {
+ LOGGER.warn("Creation time is not found for segment: {} in table:
{}", s, tableNameWithType);
+ throw new RuntimeException("Creation time is not found");
+ }
+ consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+ }));
+ } catch (Exception e) {
+ return null;
+ }
+ return consumingSegmentsAge;
+ }
+
+ /**
+ * Fetches the consuming segment info for the table and calculates the
number of offsets to catch up for each
+ * consuming segment. consumingSegmentZKMetadata is a map from consuming
segments to be moved to their ZK metadata.
+ * Returns a map from segment name to the number of offsets to catch up for
that consuming
+ * segment. Return null if failed to obtain info for any consuming segment.
+ */
+ @Nullable
+ private Map<String, Integer>
getConsumingSegmentsOffsetsToCatchUp(TableConfig tableConfig,
+ Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+ String tableNameWithType = tableConfig.getTableName();
+ Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+ try {
+ for (Map.Entry<String, SegmentZKMetadata> entry :
consumingSegmentZKMetadata.entrySet()) {
+ String segmentName = entry.getKey();
+ SegmentZKMetadata segmentZKMetadata = entry.getValue();
+ if (segmentZKMetadata == null) {
+ LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table:
{}", segmentName, tableNameWithType);
+ return null;
+ }
+ String startOffset = segmentZKMetadata.getStartOffset();
+ if (startOffset == null) {
+ LOGGER.warn("Start offset is null for segment: {} in table: {}",
segmentName, tableNameWithType);
+ return null;
+ }
+ Integer partitionId =
SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
+ // for simplicity here we disable consuming segment info if they do
not have partitionId in segmentName
+ if (partitionId == null) {
+ LOGGER.warn("Cannot determine partition id for realtime segment: {}
in table: {}", segmentName,
+ tableNameWithType);
+ return null;
+ }
+ Integer latestOffset = getLatestOffsetOfStream(tableConfig,
partitionId);
+ if (latestOffset == null) {
+ return null;
+ }
+ int offsetsToCatchUp = latestOffset - Integer.parseInt(startOffset);
+ segmentToOffsetsToCatchUp.put(segmentName, offsetsToCatchUp);
+ }
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while trying to fetch consuming segment
info for table: {}", tableNameWithType, e);
+ return null;
+ }
+ LOGGER.info("Successfully fetched consuming segments info for table: {}",
tableNameWithType);
+ return segmentToOffsetsToCatchUp;
+ }
+
+ @VisibleForTesting
+ StreamPartitionMsgOffset fetchStreamPartitionOffset(TableConfig tableConfig,
int partitionId)
+ throws Exception {
+ StreamConsumerFactory streamConsumerFactory =
+ StreamConsumerFactoryProvider.create(new
StreamConfig(tableConfig.getTableName(),
+ IngestionConfigUtils.getStreamConfigMap(tableConfig,
partitionId)));
+ try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createPartitionMetadataProvider(
+ TableRebalancer.class.getCanonicalName(), partitionId)) {
+ return
streamMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
+ STREAM_PARTITION_OFFSET_READ_TIMEOUT_MS);
+ }
+ }
+
+ @Nullable
+ private Integer getLatestOffsetOfStream(TableConfig tableConfig, int
partitionId) {
+ try {
+ StreamPartitionMsgOffset partitionMsgOffset =
fetchStreamPartitionOffset(tableConfig, partitionId);
+ if (!(partitionMsgOffset instanceof LongMsgOffset)) {
+ LOGGER.warn("Unsupported stream partition message offset type: {}",
partitionMsgOffset);
+ return null;
+ }
+ return (int) ((LongMsgOffset) partitionMsgOffset).getOffset();
+ } catch (Exception e) {
+ LOGGER.warn("Caught exception while trying to fetch stream partition of
partitionId: {}",
+ partitionId, e);
+ return null;
+ }
+ }
+
private void onReturnFailure(String errorMsg, Exception e) {
if (e != null) {
LOGGER.warn(errorMsg, e);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d23bac3e4..1d1ea56384 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,10 +24,12 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import org.apache.pinot.common.assignment.InstancePartitions;
import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -37,6 +39,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.helix.ControllerTest;
import
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
import org.apache.pinot.controller.validation.ResourceUtilizationInfo;
import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -50,8 +53,11 @@ import
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitio
import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
import org.apache.pinot.spi.config.tenant.Tenant;
import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.Mockito;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
@@ -113,8 +119,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 1);
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
- _helixResourceManager.getTableSizeReader());
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -153,6 +159,7 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
assertNotNull(rebalanceSummaryResult.getServerInfo());
assertNotNull(rebalanceSummaryResult.getSegmentInfo());
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
0);
+
assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary());
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
3);
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
3);
assertNotNull(rebalanceSummaryResult.getTagsInfo());
@@ -728,8 +735,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 0.5);
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
- _helixResourceManager.getTableSizeReader());
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
TableConfig tableConfig =
new
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
@@ -828,8 +835,8 @@ public class TableRebalancerClusterStatelessTest extends
ControllerTest {
ExecutorService executorService = Executors.newFixedThreadPool(10);
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
preChecker.init(_helixResourceManager, executorService, 0.5);
- TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null,
null, preChecker,
- _helixResourceManager.getTableSizeReader());
+ TableRebalancer tableRebalancer =
+ new TableRebalancer(_helixManager, null, null, preChecker,
_helixResourceManager.getTableSizeReader());
TableConfig tableConfig =
new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
.setNumReplicas(2)
@@ -1656,6 +1663,246 @@ public class TableRebalancerClusterStatelessTest
extends ControllerTest {
}
}
+ @Test
+ public void testRebalanceConsumingSegmentSummary()
+ throws Exception {
+ int numServers = 3;
+ int numReplica = 3;
+
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = "consumingSegmentSummary_" +
SERVER_INSTANCE_ID_PREFIX + i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ }
+
+ ConsumingSegmentInfoReader mockConsumingSegmentInfoReader =
Mockito.mock(ConsumingSegmentInfoReader.class);
+ TableRebalancer tableRebalancerOriginal =
+ new TableRebalancer(_helixManager, null, null, null,
_helixResourceManager.getTableSizeReader());
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplica)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
+
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Generate mock ConsumingSegmentsInfoMap for the consuming segments
+ int mockOffsetSmall = 1000;
+ int mockOffsetBig = 2000;
+
+ TableRebalancer tableRebalancer = Mockito.spy(tableRebalancerOriginal);
+ Mockito.doReturn(new LongMsgOffset(mockOffsetBig))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+ Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x !=
0));
+
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+
+ // dry-run with default rebalance config
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ RebalanceSummaryResult summaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult.getSegmentInfo());
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+ assertNotNull(consumingSegmentToBeMovedSummary);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
0);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
0);
+
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
0);
+
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
0);
+ assertEquals(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .size(), 0);
+ assertTrue(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== 0));
+
+ rebalanceConfig.setIncludeConsuming(true);
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ summaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult.getSegmentInfo());
+ consumingSegmentToBeMovedSummary =
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+ assertNotNull(consumingSegmentToBeMovedSummary);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
0);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
0);
+
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
0);
+
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
0);
+ assertEquals(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .size(), 0);
+ assertTrue(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== 0));
+
+ // Create new servers to replace the old servers
+ for (int i = numServers; i < numServers * 2; i++) {
+ String instanceId = "consumingSegmentSummary_" +
SERVER_INSTANCE_ID_PREFIX + i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ }
+ for (int i = 0; i < numServers; i++) {
+ _helixAdmin.removeInstanceTag(getHelixClusterName(),
"consumingSegmentSummary_" + SERVER_INSTANCE_ID_PREFIX + i,
+ TagNameUtils.getRealtimeTagForTenant(null));
+ }
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ summaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult.getSegmentInfo());
+ consumingSegmentToBeMovedSummary =
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+ assertNotNull(consumingSegmentToBeMovedSummary);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+ FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
numServers);
+ Iterator<Integer> offsetToCatchUpIterator =
+
consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().values().iterator();
+ assertEquals(offsetToCatchUpIterator.next(), mockOffsetBig);
+ if (FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS > 1) {
+ assertEquals(offsetToCatchUpIterator.next(), mockOffsetSmall);
+ }
+
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
+ FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS);
+ assertEquals(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .size(), numServers);
+ assertTrue(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
+ == mockOffsetSmall * (FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS
- 1) + mockOffsetBig));
+
+ _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+
+ for (int i = 0; i < numServers * 2; i++) {
+ stopAndDropFakeInstance("consumingSegmentSummary_" +
SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
+ @Test
+ public void testRebalanceConsumingSegmentSummaryFailure()
+ throws Exception {
+ int numServers = 3;
+ int numReplica = 3;
+
+ for (int i = 0; i < numServers; i++) {
+ String instanceId = "consumingSegmentSummaryFailure_" +
SERVER_INSTANCE_ID_PREFIX + i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ }
+
+ ConsumingSegmentInfoReader mockConsumingSegmentInfoReader =
Mockito.mock(ConsumingSegmentInfoReader.class);
+ TableRebalancer tableRebalancerOriginal =
+ new TableRebalancer(_helixManager, null, null, null,
_helixResourceManager.getTableSizeReader());
+ TableConfig tableConfig =
+ new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+ .setNumReplicas(numReplica)
+
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+ .build();
+
+ // Create the table
+ addDummySchema(RAW_TABLE_NAME);
+ _helixResourceManager.addTable(tableConfig);
+
+ // Generate mock ConsumingSegmentsInfoMap for the consuming segments
+ int mockOffsetSmall = 1000;
+ int mockOffsetBig = 2000;
+
+ TableRebalancer tableRebalancer = Mockito.spy(tableRebalancerOriginal);
+ Mockito.doReturn(new LongMsgOffset(mockOffsetBig))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+ Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x !=
0));
+
+ RebalanceConfig rebalanceConfig = new RebalanceConfig();
+ rebalanceConfig.setDryRun(true);
+ rebalanceConfig.setIncludeConsuming(true);
+
+ // Create new servers to replace the old servers
+ for (int i = numServers; i < numServers * 2; i++) {
+ String instanceId = "consumingSegmentSummaryFailure_" +
SERVER_INSTANCE_ID_PREFIX + i;
+ addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+ }
+ for (int i = 0; i < numServers; i++) {
+ _helixAdmin.removeInstanceTag(getHelixClusterName(),
+ "consumingSegmentSummaryFailure_" + SERVER_INSTANCE_ID_PREFIX + i,
+ TagNameUtils.getRealtimeTagForTenant(null));
+ }
+
+ RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig,
rebalanceConfig, null);
+ RebalanceSummaryResult summaryResult =
rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult.getSegmentInfo());
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+ assertNotNull(consumingSegmentToBeMovedSummary);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
numServers);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+ FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+
assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+
+ // Simulate not supported stream partition message type (e.g. Kinesis)
+ Mockito.doReturn((StreamPartitionMsgOffset) o -> 0)
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+ Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x !=
0));
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ summaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult.getSegmentInfo());
+ consumingSegmentToBeMovedSummary =
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+ assertNotNull(consumingSegmentToBeMovedSummary);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
numServers);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+ FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+
assertNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+
assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== -1));
+
+ // Simulate stream partition offset fetch failure
+ Mockito.doThrow(new TimeoutException("timeout"))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+ Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+ .when(tableRebalancer)
+ .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x !=
0));
+
+ rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig,
null);
+ summaryResult = rebalanceResult.getRebalanceSummaryResult();
+ assertNotNull(summaryResult.getSegmentInfo());
+ consumingSegmentToBeMovedSummary =
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+ assertNotNull(consumingSegmentToBeMovedSummary);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
numServers);
+
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+ FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+
assertNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+
assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== -1));
+
+ _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+
+ for (int i = 0; i < numServers * 2; i++) {
+ stopAndDropFakeInstance("consumingSegmentSummaryFailure_" +
SERVER_INSTANCE_ID_PREFIX + i);
+ }
+ }
+
@AfterClass
public void tearDown() {
stopFakeInstances();
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index 7028844169..d4b8cbfa48 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -56,7 +56,7 @@ public class FakeStreamConfigUtils {
private static final LongMsgOffset SMALLEST_OFFSET = new LongMsgOffset(0);
private static final LongMsgOffset LARGEST_OFFSET = new
LongMsgOffset(Integer.MAX_VALUE);
private static final String NUM_PARTITIONS_KEY = "num.partitions";
- private static final int DEFAULT_NUM_PARTITIONS = 2;
+ public static final int DEFAULT_NUM_PARTITIONS = 2;
private static final String STREAM_TYPE = "fakeStream";
private static final String TOPIC_NAME = "fakeTopic";
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
new file mode 100644
index 0000000000..16999c887c
--- /dev/null
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class KafkaConsumingSegmentToBeMovedSummaryIntegrationTest extends
BaseRealtimeClusterIntegrationTest {
+
+ @Override
+ @BeforeClass
+ public void setUp()
+ throws Exception {
+ TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+ // Start the Pinot cluster
+ startZk();
+ startController();
+
+ HelixConfigScope scope =
+ new
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+ .build();
+ // Set max segment preprocess parallelism to 8
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
Integer.toString(8));
+ // Set max segment startree preprocess parallelism to 6
+ _helixManager.getConfigAccessor()
+ .set(scope,
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
Integer.toString(6));
+
+ startBroker();
+ startServer();
+
+ // Start Kafka
+ startKafka();
+
+ // Unpack the Avro files
+ List<File> avroFiles = unpackAvroData(_tempDir);
+
+ // Create and upload the schema and table config
+ Schema schema = createSchema();
+ addSchema(schema);
+ TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+ IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+ Map<String, String> streamConfig = getStreamConfigs();
+
streamConfig.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE,
"1000000");
+ streamConfig.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ indexingConfig.setStreamConfigs(streamConfig);
+ tableConfig.setIndexingConfig(indexingConfig);
+ addTableConfig(tableConfig);
+
+ // Push data into Kafka
+ pushAvroIntoKafka(avroFiles);
+
+ // create segments and upload them to controller
+ createSegmentsAndUpload(avroFiles, schema, tableConfig);
+
+ // Set up the H2 connection
+ setUpH2Connection(avroFiles);
+
+ // Initialize the query generator
+ setUpQueryGenerator(avroFiles);
+
+ runValidationJob(600_000);
+
+ // Wait for all documents loaded
+ waitForAllDocsLoaded(600_000L);
+ }
+
+ @Test
+ public void testConsumingSegmentSummary()
+ throws Exception {
+ String response = sendPostRequest(
+ getControllerRequestURLBuilder().forTableRebalance(getTableName(),
"REALTIME", true, false, true, false, -1));
+ RebalanceResult result = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ Assert.assertNotNull(result);
+ Assert.assertNotNull(result.getRebalanceSummaryResult());
+ Assert.assertNotNull(result.getRebalanceSummaryResult().getSegmentInfo());
+ RebalanceSummaryResult.SegmentInfo segmentInfo =
result.getRebalanceSummaryResult().getSegmentInfo();
+ RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary
consumingSegmentToBeMovedSummary =
+ segmentInfo.getConsumingSegmentToBeMovedSummary();
+ Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
0);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
0);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
+ 0);
+
+ startServer();
+ response = sendPostRequest(
+ getControllerRequestURLBuilder().forTableRebalance(getTableName(),
"REALTIME", true, false, true, false, -1));
+ result = JsonUtils.stringToObject(response, RebalanceResult.class);
+ Assert.assertNotNull(result);
+ Assert.assertNotNull(result.getRebalanceSummaryResult());
+ Assert.assertNotNull(result.getRebalanceSummaryResult().getSegmentInfo());
+ segmentInfo = result.getRebalanceSummaryResult().getSegmentInfo();
+ consumingSegmentToBeMovedSummary =
segmentInfo.getConsumingSegmentToBeMovedSummary();
+ Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
1);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
1);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
+ 1);
+ Assert.assertTrue(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
== 57801
+ || x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() == 0));
+ Assert.assertEquals(consumingSegmentToBeMovedSummary
+ .getServerConsumingSegmentSummary()
+ .values()
+ .stream()
+ .reduce(0, (a, b) -> a +
b.getTotalOffsetsToCatchUpAcrossAllConsumingSegments(), Integer::sum), 57801);
+
+ // set includeConsuming to false
+ response = sendPostRequest(
+ getControllerRequestURLBuilder().forTableRebalance(getTableName(),
"REALTIME", true, false, false, false, -1));
+ result = JsonUtils.stringToObject(response, RebalanceResult.class);
+ Assert.assertNotNull(result);
+ Assert.assertNotNull(result.getRebalanceSummaryResult());
+ Assert.assertNotNull(result.getRebalanceSummaryResult().getSegmentInfo());
+ segmentInfo = result.getRebalanceSummaryResult().getSegmentInfo();
+ consumingSegmentToBeMovedSummary =
segmentInfo.getConsumingSegmentToBeMovedSummary();
+ Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
0);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
0);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
+ 0);
+
+ stopKafka();
+ response = sendPostRequest(
+ getControllerRequestURLBuilder().forTableRebalance(getTableName(),
"REALTIME", true, false, true, false, -1));
+ RebalanceResult resultNoInfo = JsonUtils.stringToObject(response,
RebalanceResult.class);
+ Assert.assertNotNull(resultNoInfo);
+ Assert.assertNotNull(resultNoInfo.getRebalanceSummaryResult());
+
Assert.assertNotNull(resultNoInfo.getRebalanceSummaryResult().getSegmentInfo());
+ segmentInfo = resultNoInfo.getRebalanceSummaryResult().getSegmentInfo();
+ consumingSegmentToBeMovedSummary =
segmentInfo.getConsumingSegmentToBeMovedSummary();
+ Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
1);
+
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
1);
+
Assert.assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+
Assert.assertNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 7332b9cce4..b4e73989a7 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -75,6 +75,7 @@ import
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResul
import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
import org.apache.pinot.segment.spi.index.StandardIndexes;
@@ -298,6 +299,8 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
_executorService = Executors.newFixedThreadPool(10);
preChecker.init(_helixResourceManager, _executorService,
_controllerConfig.getDiskUtilizationThreshold());
+ ConsumingSegmentInfoReader consumingSegmentInfoReader =
+ new ConsumingSegmentInfoReader(_executorService, null,
_helixResourceManager);
_tableRebalancer = new
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker,
_resourceManager.getTableSizeReader());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]