This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 38ad8fe44e Consuming Rebalance Summary (#15368)
38ad8fe44e is described below

commit 38ad8fe44ead669f35245d3bbf53b6555cb9df49
Author: Jhow <[email protected]>
AuthorDate: Mon Apr 7 10:32:33 2025 -0700

    Consuming Rebalance Summary (#15368)
    
    * Add consumingSegmentsToBeMoved to SegmentInfo
    
    * test consumingSegmentToBeMoved
    
    * Add consuming segment summary
    
    * lint
    
    * register connection manager instead of assign in constructor
    
    * add integration test
    
    * lint
    
    * license
    
    * fix JsonInclude
    
    * rename byte to offset in the context of Kafka stream offset
    
    * add segment age into consuming segment summary and reorg the info
    
    * update tests
    
    * docs: log msg and comment
    
    * Update format
    
    * variables renaming
    
    * naming
    
    * fix JsonInclude annotation
    
    * lint
    
    * variable naming
    
    * variable naming
    
    * lint: style
    
    * handle illegal offset format return from stream
    
    * move JsonInclude annotators to class level
    
    * show null and -1 instead of drop the field in 
ConsumingSegmentToBeMovedSummary
    
    * docs: comment update
    
    * lint: import
    
    * docs: remove ambiguity
    
    * update consuming segment logic
    
    update import
    
    * trigger CI
    
    * pass consumingSegmentInfoReader into PinotHelixResourceManager
    
    * variable renamed and bug fix
    
    * remove extra constructor
    
    * create ConsumingSegmentInfoReader in OfflineClusterIntegrationTest
    
    * Use StreamMetadataProvider instead of ConsumingSegmentInfoReader
    
    * remove resource
    
    * update test
    
    * docs: add detail for consumingSegmentsToBeMovedWithOldestAgeInMinutes
    
    * docs:
---
 .../core/rebalance/RebalanceSummaryResult.java     | 142 +++++++++--
 .../helix/core/rebalance/TableRebalancer.java      | 230 +++++++++++++++++-
 .../TableRebalancerClusterStatelessTest.java       | 259 ++++++++++++++++++++-
 .../impl/fakestream/FakeStreamConfigUtils.java     |   2 +-
 ...mingSegmentToBeMovedSummaryIntegrationTest.java | 175 ++++++++++++++
 .../tests/OfflineClusterIntegrationTest.java       |   3 +
 6 files changed, 782 insertions(+), 29 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
index 753d3d5dd4..82976da5d8 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/RebalanceSummaryResult.java
@@ -34,12 +34,8 @@ import javax.annotation.Nullable;
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonInclude(JsonInclude.Include.NON_NULL)
 public class RebalanceSummaryResult {
-
-  @JsonInclude(JsonInclude.Include.NON_NULL)
   private final ServerInfo _serverInfo;
-  @JsonInclude(JsonInclude.Include.NON_NULL)
   private final SegmentInfo _segmentInfo;
-  @JsonInclude(JsonInclude.Include.NON_NULL)
   private final List<TagInfo> _tagsInfo;
 
   /**
@@ -71,6 +67,7 @@ public class RebalanceSummaryResult {
     return _tagsInfo;
   }
 
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   public static class ServerSegmentChangeInfo {
     private final ServerStatus _serverStatus;
     private final int _totalSegmentsAfterRebalance;
@@ -78,7 +75,6 @@ public class RebalanceSummaryResult {
     private final int _segmentsAdded;
     private final int _segmentsDeleted;
     private final int _segmentsUnchanged;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final List<String> _tagList;
 
     /**
@@ -227,19 +223,14 @@ public class RebalanceSummaryResult {
     }
   }
 
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   public static class ServerInfo {
     private final int _numServersGettingNewSegments;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final RebalanceChangeInfo _numServers;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final Set<String> _serversAdded;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final Set<String> _serversRemoved;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final Set<String> _serversUnchanged;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final Set<String> _serversGettingNewSegments;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final Map<String, ServerSegmentChangeInfo> 
_serverSegmentChangeInfo;
 
     /**
@@ -306,18 +297,130 @@ public class RebalanceSummaryResult {
     }
   }
 
+  public static class ConsumingSegmentToBeMovedSummary {
+    private final int _numConsumingSegmentsToBeMoved;
+    private final int _numServersGettingConsumingSegmentsAdded;
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    private final Map<String, Integer> 
_consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+    private final Map<String, ConsumingSegmentSummaryPerServer> 
_serverConsumingSegmentSummary;
+
+    /**
+     * Constructor for ConsumingSegmentToBeMovedSummary
+     * @param numConsumingSegmentsToBeMoved total number of consuming segments 
to be moved as part of this rebalance
+     * @param numServersGettingConsumingSegmentsAdded maximum bytes of 
consuming segments to be moved to catch up
+     * @param consumingSegmentsToBeMovedWithMostOffsetsToCatchUp top consuming 
segments to be moved to catch up.
+     *                                                           Map from 
segment name to its number of offsets to
+     *                                                           catch up on 
the new server. This is essentially the
+     *                                                           difference 
between the latest offset of the stream
+     *                                                           and the 
segment's start offset of the stream. The
+     *                                                           map is set to 
null if the number of offsets to catch
+     *                                                           up could not 
be determined for at least one
+     *                                                           consuming 
segment
+     * @param consumingSegmentsToBeMovedWithOldestAgeInMinutes oldest 
consuming segments to be moved to catch up. Map
+     *                                                         from segment 
name to its age in minutes. The map is
+     *                                                         set to null if 
ZK metadata is not available or the
+     *                                                         creation time 
is not found for at least one consuming
+     *                                                         segment.
+     *                                                         The age of a 
segment is determined by its creation
+     *                                                         time from ZK 
metadata. Segment age is an approximation
+     *                                                         to data age for 
a consuming segment. It may not reflect
+     *                                                         the actual 
oldest age of data in the consuming segment.
+     *                                                         For reasons, a 
segment could consume events which date
+     *                                                         before the 
segment created. We collect segment age
+     *                                                         here as there 
is no obvious way to get the age of the
+     *                                                         oldest data in 
the stream for a specific consuming
+     *                                                         segment
+     * @param serverConsumingSegmentSummary ConsumingSegmentSummaryPerServer 
per server
+     */
+    @JsonCreator
+    public ConsumingSegmentToBeMovedSummary(
+        @JsonProperty("numConsumingSegmentsToBeMoved") int 
numConsumingSegmentsToBeMoved,
+        @JsonProperty("numServersGettingConsumingSegmentsAdded") int 
numServersGettingConsumingSegmentsAdded,
+        @JsonProperty("consumingSegmentsToBeMovedWithMostOffsetsToCatchUp") 
@Nullable
+        Map<String, Integer> 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp,
+        @JsonProperty("consumingSegmentsToBeMovedWithOldestAgeInMinutes") 
@Nullable
+        Map<String, Integer> consumingSegmentsToBeMovedWithOldestAgeInMinutes,
+        @JsonProperty("serverConsumingSegmentSummary") @Nullable
+        Map<String, ConsumingSegmentSummaryPerServer> 
serverConsumingSegmentSummary) {
+      _numConsumingSegmentsToBeMoved = numConsumingSegmentsToBeMoved;
+      _numServersGettingConsumingSegmentsAdded = 
numServersGettingConsumingSegmentsAdded;
+      _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp = 
consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+      _consumingSegmentsToBeMovedWithOldestAgeInMinutes = 
consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+      _serverConsumingSegmentSummary = serverConsumingSegmentSummary;
+    }
+
+    @JsonProperty
+    public int getNumConsumingSegmentsToBeMoved() {
+      return _numConsumingSegmentsToBeMoved;
+    }
+
+    @JsonProperty
+    public int getNumServersGettingConsumingSegmentsAdded() {
+      return _numServersGettingConsumingSegmentsAdded;
+    }
+
+    @JsonProperty
+    public Map<String, Integer> 
getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp() {
+      return _consumingSegmentsToBeMovedWithMostOffsetsToCatchUp;
+    }
+
+    @JsonProperty
+    public Map<String, Integer> 
getConsumingSegmentsToBeMovedWithOldestAgeInMinutes() {
+      return _consumingSegmentsToBeMovedWithOldestAgeInMinutes;
+    }
+
+    @JsonProperty
+    public Map<String, ConsumingSegmentSummaryPerServer> 
getServerConsumingSegmentSummary() {
+      return _serverConsumingSegmentSummary;
+    }
+
+    public static class ConsumingSegmentSummaryPerServer {
+      private final int _numConsumingSegmentsToBeAdded;
+      private final int _totalOffsetsToCatchUpAcrossAllConsumingSegments;
+
+      /**
+       * Constructor for ConsumingSegmentSummaryPerServer
+       * @param numConsumingSegmentsToBeAdded number of consuming segments to 
be added to this server
+       * @param totalOffsetsToCatchUpAcrossAllConsumingSegments total number 
of offsets to catch up across all consuming
+       *                                                         segments. The 
number of offsets to catch up for a
+       *                                                         consuming 
segment is essentially the difference
+       *                                                         between the 
latest offset of the stream and the
+       *                                                         segment's 
start offset of the stream. Set to -1 if
+       *                                                         the offsets 
to catch up could not be determined for
+       *                                                         at least one 
consuming segment
+       */
+      @JsonCreator
+      public ConsumingSegmentSummaryPerServer(
+          @JsonProperty("numConsumingSegmentsToBeAdded") int 
numConsumingSegmentsToBeAdded,
+          @JsonProperty("totalOffsetsToCatchUpAcrossAllConsumingSegments")
+          int totalOffsetsToCatchUpAcrossAllConsumingSegments) {
+        _numConsumingSegmentsToBeAdded = numConsumingSegmentsToBeAdded;
+        _totalOffsetsToCatchUpAcrossAllConsumingSegments = 
totalOffsetsToCatchUpAcrossAllConsumingSegments;
+      }
+
+      @JsonProperty
+      public int getNumConsumingSegmentsToBeAdded() {
+        return _numConsumingSegmentsToBeAdded;
+      }
+
+      @JsonProperty
+      public int getTotalOffsetsToCatchUpAcrossAllConsumingSegments() {
+        return _totalOffsetsToCatchUpAcrossAllConsumingSegments;
+      }
+    }
+  }
+
+  @JsonInclude(JsonInclude.Include.NON_NULL)
   public static class SegmentInfo {
     // TODO: Add a metric to estimate the total time it will take to rebalance
     private final int _totalSegmentsToBeMoved;
     private final int _maxSegmentsAddedToASingleServer;
     private final long _estimatedAverageSegmentSizeInBytes;
     private final long _totalEstimatedDataToBeMovedInBytes;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final RebalanceChangeInfo _replicationFactor;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final RebalanceChangeInfo _numSegmentsInSingleReplica;
-    @JsonInclude(JsonInclude.Include.NON_NULL)
     private final RebalanceChangeInfo _numSegmentsAcrossAllReplicas;
+    private final ConsumingSegmentToBeMovedSummary 
_consumingSegmentToBeMovedSummary;
 
     /**
      * Constructor for SegmentInfo
@@ -328,6 +431,7 @@ public class RebalanceSummaryResult {
      * @param replicationFactor replication factor before and after this 
rebalance
      * @param numSegmentsInSingleReplica number of segments in single replica 
before and after this rebalance
      * @param numSegmentsAcrossAllReplicas total number of segments across all 
replicas before and after this rebalance
+     * @param consumingSegmentToBeMovedSummary consuming segment summary. Set 
to null if the table is an offline table
      */
     @JsonCreator
     public SegmentInfo(@JsonProperty("totalSegmentsToBeMoved") int 
totalSegmentsToBeMoved,
@@ -336,7 +440,9 @@ public class RebalanceSummaryResult {
         @JsonProperty("totalEstimatedDataToBeMovedInBytes") long 
totalEstimatedDataToBeMovedInBytes,
         @JsonProperty("replicationFactor") @Nullable RebalanceChangeInfo 
replicationFactor,
         @JsonProperty("numSegmentsInSingleReplica") @Nullable 
RebalanceChangeInfo numSegmentsInSingleReplica,
-        @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable 
RebalanceChangeInfo numSegmentsAcrossAllReplicas) {
+        @JsonProperty("numSegmentsAcrossAllReplicas") @Nullable 
RebalanceChangeInfo numSegmentsAcrossAllReplicas,
+        @JsonProperty("consumingSegmentToBeMovedSummary") @Nullable
+        ConsumingSegmentToBeMovedSummary consumingSegmentToBeMovedSummary) {
       _totalSegmentsToBeMoved = totalSegmentsToBeMoved;
       _maxSegmentsAddedToASingleServer = maxSegmentsAddedToASingleServer;
       _estimatedAverageSegmentSizeInBytes = estimatedAverageSegmentSizeInBytes;
@@ -344,6 +450,7 @@ public class RebalanceSummaryResult {
       _replicationFactor = replicationFactor;
       _numSegmentsInSingleReplica = numSegmentsInSingleReplica;
       _numSegmentsAcrossAllReplicas = numSegmentsAcrossAllReplicas;
+      _consumingSegmentToBeMovedSummary = consumingSegmentToBeMovedSummary;
     }
 
     @JsonProperty
@@ -380,6 +487,11 @@ public class RebalanceSummaryResult {
     public RebalanceChangeInfo getNumSegmentsAcrossAllReplicas() {
       return _numSegmentsAcrossAllReplicas;
     }
+
+    @JsonProperty
+    public ConsumingSegmentToBeMovedSummary 
getConsumingSegmentToBeMovedSummary() {
+      return _consumingSegmentToBeMovedSummary;
+    }
   }
 
   public enum ServerStatus {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
index f26a7b8060..d29f2091b4 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancer.java
@@ -21,9 +21,12 @@ package org.apache.pinot.controller.helix.core.rebalance;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -33,6 +36,7 @@ import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.function.ToIntFunction;
+import java.util.stream.Collectors;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.tuple.Pair;
@@ -50,11 +54,14 @@ import 
org.apache.pinot.common.assignment.InstanceAssignmentConfigUtils;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
 import org.apache.pinot.common.exception.InvalidConfigException;
+import org.apache.pinot.common.metadata.ZKMetadataProvider;
+import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.common.metrics.ControllerTimer;
 import org.apache.pinot.common.tier.PinotServerTierStorage;
 import org.apache.pinot.common.tier.Tier;
 import org.apache.pinot.common.tier.TierFactory;
+import org.apache.pinot.common.utils.SegmentUtils;
 import org.apache.pinot.common.utils.config.TableConfigUtils;
 import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.common.utils.config.TierConfigUtils;
@@ -71,7 +78,16 @@ import org.apache.pinot.spi.config.table.TagOverrideConfig;
 import org.apache.pinot.spi.config.table.TierConfig;
 import org.apache.pinot.spi.config.table.assignment.InstanceAssignmentConfig;
 import org.apache.pinot.spi.config.table.assignment.InstancePartitionsType;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamConsumerFactory;
+import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadataProvider;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.SegmentStateModel;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -120,6 +136,10 @@ import org.slf4j.LoggerFactory;
  */
 public class TableRebalancer {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(TableRebalancer.class);
+  private static final int TOP_N_IN_CONSUMING_SEGMENT_SUMMARY = 10;
+  // TODO: Consider making the timeoutMs below table rebalancer configurable
+  private static final int TABLE_SIZE_READER_TIMEOUT_MS = 30_000;
+  private static final int STREAM_PARTITION_OFFSET_READ_TIMEOUT_MS = 10_000;
   private final HelixManager _helixManager;
   private final HelixDataAccessor _helixDataAccessor;
   private final TableRebalanceObserver _tableRebalanceObserver;
@@ -618,9 +638,8 @@ public class TableRebalancer {
     }
     LOGGER.info("Fetching the table size for table: {}", tableNameWithType);
     try {
-      // TODO: Consider making the timeoutMs for fetching table size via table 
rebalancer configurable
       TableSizeReader.TableSubTypeSizeDetails sizeDetails =
-          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 30_000);
+          _tableSizeReader.getTableSubtypeSize(tableNameWithType, 
TABLE_SIZE_READER_TIMEOUT_MS);
       LOGGER.info("Fetched the table size details for table: {}", 
tableNameWithType);
       return sizeDetails;
     } catch (InvalidConfigException e) {
@@ -638,22 +657,42 @@ public class TableRebalancer {
       TableSizeReader.TableSubTypeSizeDetails tableSubTypeSizeDetails, 
TableConfig tableConfig) {
     LOGGER.info("Calculating rebalance summary for table: {} with 
rebalanceJobId: {}",
         tableNameWithType, rebalanceJobId);
+    boolean isOfflineTable = 
TableNameBuilder.getTableTypeFromTableName(tableNameWithType) == 
TableType.OFFLINE;
     int existingReplicationFactor = 0;
     int newReplicationFactor = 0;
     Map<String, Set<String>> existingServersToSegmentMap = new HashMap<>();
     Map<String, Set<String>> newServersToSegmentMap = new HashMap<>();
+    Map<String, Set<String>> existingServersToConsumingSegmentMap = 
isOfflineTable ? null : new HashMap<>();
+    Map<String, Set<String>> newServersToConsumingSegmentMap = isOfflineTable 
? null : new HashMap<>();
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
currentAssignment.entrySet()) {
       existingReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        existingServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      String segmentName = entrySet.getKey();
+      Collection<String> segmentStates = entrySet.getValue().values();
+      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != 
null && segmentStates.stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
segmentStates.stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
+
+      for (String instanceName : entrySet.getValue().keySet()) {
+        existingServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+        if (isSegmentConsuming) {
+          existingServersToConsumingSegmentMap.computeIfAbsent(instanceName, k 
-> new HashSet<>()).add(segmentName);
+        }
       }
     }
 
     for (Map.Entry<String, Map<String, String>> entrySet : 
targetAssignment.entrySet()) {
       newReplicationFactor = entrySet.getValue().size();
-      for (String segmentKey : entrySet.getValue().keySet()) {
-        newServersToSegmentMap.computeIfAbsent(segmentKey, k -> new 
HashSet<>()).add(entrySet.getKey());
+      String segmentName = entrySet.getKey();
+      Collection<String> segmentStates = entrySet.getValue().values();
+      boolean isSegmentConsuming = existingServersToConsumingSegmentMap != 
null && segmentStates.stream()
+          .noneMatch(state -> state.equals(SegmentStateModel.ONLINE)) && 
segmentStates.stream()
+          .anyMatch(state -> state.equals(SegmentStateModel.CONSUMING));
+      for (String instanceName : entrySet.getValue().keySet()) {
+        newServersToSegmentMap.computeIfAbsent(instanceName, k -> new 
HashSet<>()).add(segmentName);
+        if (isSegmentConsuming) {
+          newServersToConsumingSegmentMap.computeIfAbsent(instanceName, k -> 
new HashSet<>()).add(segmentName);
+        }
       }
     }
     RebalanceSummaryResult.RebalanceChangeInfo replicationFactor
@@ -780,6 +819,15 @@ public class TableRebalancer {
       }
     }
 
+    if (existingServersToConsumingSegmentMap != null && 
newServersToConsumingSegmentMap != null) {
+      // turn the map into {server: added consuming segments}
+      for (Map.Entry<String, Set<String>> entry : 
newServersToConsumingSegmentMap.entrySet()) {
+        String server = entry.getKey();
+        
entry.getValue().removeAll(existingServersToConsumingSegmentMap.getOrDefault(server,
 Collections.emptySet()));
+      }
+      newServersToConsumingSegmentMap.entrySet().removeIf(entry -> 
entry.getValue().isEmpty());
+    }
+
     RebalanceSummaryResult.RebalanceChangeInfo numSegmentsInSingleReplica
         = new 
RebalanceSummaryResult.RebalanceChangeInfo(currentAssignment.size(), 
targetAssignment.size());
 
@@ -802,9 +850,11 @@ public class TableRebalancer {
         serversGettingNewSegments, serverSegmentChangeInfoMap);
     // TODO: Add a metric to estimate the total time it will take to 
rebalance. Need some good heuristics on how
     //       rebalance time can vary with number of segments added
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        isOfflineTable ? null : getConsumingSegmentSummary(tableConfig, 
newServersToConsumingSegmentMap);
     RebalanceSummaryResult.SegmentInfo segmentInfo = new 
RebalanceSummaryResult.SegmentInfo(totalSegmentsToBeMoved,
         maxSegmentsAddedToServer, averageSegmentSizeInBytes, 
totalEstimatedDataToBeMovedInBytes,
-        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas);
+        replicationFactor, numSegmentsInSingleReplica, 
numSegmentsAcrossAllReplicas, consumingSegmentToBeMovedSummary);
 
     LOGGER.info("Calculated rebalance summary for table: {} with 
rebalanceJobId: {}", tableNameWithType,
         rebalanceJobId);
@@ -817,6 +867,172 @@ public class TableRebalancer {
     return instanceConfig.getTags();
   }
 
+  private RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
getConsumingSegmentSummary(TableConfig tableConfig,
+      Map<String, Set<String>> newServersToConsumingSegmentMap) {
+    String tableNameWithType = tableConfig.getTableName();
+    if (newServersToConsumingSegmentMap.isEmpty()) {
+      return new RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(0, 0, 
new HashMap<>(), new HashMap<>(),
+          new HashMap<>());
+    }
+    int numConsumingSegmentsToBeMoved =
+        newServersToConsumingSegmentMap.values().stream().reduce(0, (a, b) -> 
a + b.size(), Integer::sum);
+    Set<String> uniqueConsumingSegments =
+        
newServersToConsumingSegmentMap.values().stream().flatMap(Set::stream).collect(Collectors.toSet());
+    Map<String, SegmentZKMetadata> consumingSegmentZKmetadata = new 
HashMap<>();
+    uniqueConsumingSegments.forEach(segment -> 
consumingSegmentZKmetadata.put(segment,
+        
ZKMetadataProvider.getSegmentZKMetadata(_helixManager.getHelixPropertyStore(), 
tableNameWithType, segment)));
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUp =
+        getConsumingSegmentsOffsetsToCatchUp(tableConfig, 
consumingSegmentZKmetadata);
+    Map<String, Integer> consumingSegmentsAge = 
getConsumingSegmentsAge(tableNameWithType, consumingSegmentZKmetadata);
+
+    Map<String, Integer> consumingSegmentsOffsetsToCatchUpTopN;
+    Map<String, 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer>
+        consumingSegmentSummaryPerServer = new HashMap<>();
+    if (consumingSegmentsOffsetsToCatchUp != null) {
+      consumingSegmentsOffsetsToCatchUpTopN =
+          getTopNConsumingSegmentWithValue(consumingSegmentsOffsetsToCatchUp, 
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        int totalOffsetsToCatchUp =
+            
segments.stream().mapToInt(consumingSegmentsOffsetsToCatchUp::get).sum();
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), totalOffsetsToCatchUp));
+      });
+    } else {
+      consumingSegmentsOffsetsToCatchUpTopN = null;
+      newServersToConsumingSegmentMap.forEach((server, segments) -> {
+        consumingSegmentSummaryPerServer.put(server,
+            new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary.ConsumingSegmentSummaryPerServer(
+                segments.size(), -1));
+      });
+    }
+
+    Map<String, Integer> consumingSegmentsOldestTopN =
+        consumingSegmentsAge == null ? null
+            : getTopNConsumingSegmentWithValue(consumingSegmentsAge, 
TOP_N_IN_CONSUMING_SEGMENT_SUMMARY);
+
+    return new 
RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary(numConsumingSegmentsToBeMoved,
+        newServersToConsumingSegmentMap.size(), 
consumingSegmentsOffsetsToCatchUpTopN, consumingSegmentsOldestTopN,
+        consumingSegmentSummaryPerServer);
+  }
+
+  private static Map<String, Integer> getTopNConsumingSegmentWithValue(
+      Map<String, Integer> consumingSegmentsWithValue, @Nullable Integer topN) 
{
+    Map<String, Integer> topNConsumingSegments = new LinkedHashMap<>();
+    consumingSegmentsWithValue.entrySet()
+        .stream()
+        .sorted(Collections.reverseOrder(Map.Entry.comparingByValue()))
+        .limit(topN == null ? consumingSegmentsWithValue.size() : topN)
+        .forEach(entry -> topNConsumingSegments.put(entry.getKey(), 
entry.getValue()));
+    return topNConsumingSegments;
+  }
+
+  /**
+   * Fetches the age of each consuming segment in minutes.
+   * The age of a consuming segment is the time since the segment was created 
in ZK, it could be different to when
+   * the stream should start to be consumed for the segment.
+   * consumingSegmentZKMetadata is a map from consuming segments to be moved 
to their ZK metadata. Returns a map from
+   * segment name to the age of that consuming segment. Return null if failed 
to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> getConsumingSegmentsAge(String 
tableNameWithType,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    Map<String, Integer> consumingSegmentsAge = new HashMap<>();
+    long now = System.currentTimeMillis();
+    try {
+      consumingSegmentZKMetadata.forEach(((s, segmentZKMetadata) -> {
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("SegmentZKMetadata is null for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("SegmentZKMetadata is null");
+        }
+        long creationTime = segmentZKMetadata.getCreationTime();
+        if (creationTime < 0) {
+          LOGGER.warn("Creation time is not found for segment: {} in table: 
{}", s, tableNameWithType);
+          throw new RuntimeException("Creation time is not found");
+        }
+        consumingSegmentsAge.put(s, (int) (now - creationTime) / 60_000);
+      }));
+    } catch (Exception e) {
+      return null;
+    }
+    return consumingSegmentsAge;
+  }
+
+  /**
+   * Fetches the consuming segment info for the table and calculates the 
number of offsets to catch up for each
+   * consuming segment. consumingSegmentZKMetadata is a map from consuming 
segments to be moved to their ZK metadata.
+   * Returns a map from segment name to the number of offsets to catch up for 
that consuming
+   * segment. Return null if failed to obtain info for any consuming segment.
+   */
+  @Nullable
+  private Map<String, Integer> 
getConsumingSegmentsOffsetsToCatchUp(TableConfig tableConfig,
+      Map<String, SegmentZKMetadata> consumingSegmentZKMetadata) {
+    String tableNameWithType = tableConfig.getTableName();
+    Map<String, Integer> segmentToOffsetsToCatchUp = new HashMap<>();
+    try {
+      for (Map.Entry<String, SegmentZKMetadata> entry : 
consumingSegmentZKMetadata.entrySet()) {
+        String segmentName = entry.getKey();
+        SegmentZKMetadata segmentZKMetadata = entry.getValue();
+        if (segmentZKMetadata == null) {
+          LOGGER.warn("Cannot find SegmentZKMetadata for segment: {} in table: 
{}", segmentName, tableNameWithType);
+          return null;
+        }
+        String startOffset = segmentZKMetadata.getStartOffset();
+        if (startOffset == null) {
+          LOGGER.warn("Start offset is null for segment: {} in table: {}", 
segmentName, tableNameWithType);
+          return null;
+        }
+        Integer partitionId = 
SegmentUtils.getPartitionIdFromRealtimeSegmentName(segmentName);
+        // for simplicity here we disable consuming segment info if they do 
not have partitionId in segmentName
+        if (partitionId == null) {
+          LOGGER.warn("Cannot determine partition id for realtime segment: {} 
in table: {}", segmentName,
+              tableNameWithType);
+          return null;
+        }
+        Integer latestOffset = getLatestOffsetOfStream(tableConfig, 
partitionId);
+        if (latestOffset == null) {
+          return null;
+        }
+        int offsetsToCatchUp = latestOffset - Integer.parseInt(startOffset);
+        segmentToOffsetsToCatchUp.put(segmentName, offsetsToCatchUp);
+      }
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while trying to fetch consuming segment 
info for table: {}", tableNameWithType, e);
+      return null;
+    }
+    LOGGER.info("Successfully fetched consuming segments info for table: {}", 
tableNameWithType);
+    return segmentToOffsetsToCatchUp;
+  }
+
+  @VisibleForTesting
+  StreamPartitionMsgOffset fetchStreamPartitionOffset(TableConfig tableConfig, 
int partitionId)
+      throws Exception {
+    StreamConsumerFactory streamConsumerFactory =
+        StreamConsumerFactoryProvider.create(new 
StreamConfig(tableConfig.getTableName(),
+            IngestionConfigUtils.getStreamConfigMap(tableConfig, 
partitionId)));
+    try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createPartitionMetadataProvider(
+        TableRebalancer.class.getCanonicalName(), partitionId)) {
+      return 
streamMetadataProvider.fetchStreamPartitionOffset(OffsetCriteria.LARGEST_OFFSET_CRITERIA,
+          STREAM_PARTITION_OFFSET_READ_TIMEOUT_MS);
+    }
+  }
+
+  @Nullable
+  private Integer getLatestOffsetOfStream(TableConfig tableConfig, int 
partitionId) {
+    try {
+      StreamPartitionMsgOffset partitionMsgOffset = 
fetchStreamPartitionOffset(tableConfig, partitionId);
+      if (!(partitionMsgOffset instanceof LongMsgOffset)) {
+        LOGGER.warn("Unsupported stream partition message offset type: {}", 
partitionMsgOffset);
+        return null;
+      }
+      return (int) ((LongMsgOffset) partitionMsgOffset).getOffset();
+    } catch (Exception e) {
+      LOGGER.warn("Caught exception while trying to fetch stream partition of 
partitionId: {}",
+          partitionId, e);
+      return null;
+    }
+  }
+
   private void onReturnFailure(String errorMsg, Exception e) {
     if (e != null) {
       LOGGER.warn(errorMsg, e);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
index 5d23bac3e4..1d1ea56384 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/rebalance/TableRebalancerClusterStatelessTest.java
@@ -24,10 +24,12 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 import org.apache.pinot.common.assignment.InstancePartitions;
 import org.apache.pinot.common.assignment.InstancePartitionsUtils;
@@ -37,6 +39,7 @@ import org.apache.pinot.common.utils.config.TagNameUtils;
 import org.apache.pinot.controller.ControllerConf;
 import org.apache.pinot.controller.helix.ControllerTest;
 import 
org.apache.pinot.controller.helix.core.assignment.segment.SegmentAssignmentUtils;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.controller.utils.SegmentMetadataMockUtils;
 import org.apache.pinot.controller.validation.ResourceUtilizationInfo;
 import org.apache.pinot.core.realtime.impl.fakestream.FakeStreamConfigUtils;
@@ -50,8 +53,11 @@ import 
org.apache.pinot.spi.config.table.assignment.InstanceReplicaGroupPartitio
 import org.apache.pinot.spi.config.table.assignment.InstanceTagPoolConfig;
 import org.apache.pinot.spi.config.tenant.Tenant;
 import org.apache.pinot.spi.config.tenant.TenantRole;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.mockito.Mockito;
 import org.testng.annotations.AfterClass;
 import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
@@ -113,8 +119,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 1);
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
-        _helixResourceManager.getTableSizeReader());
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -153,6 +159,7 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     assertNotNull(rebalanceSummaryResult.getServerInfo());
     assertNotNull(rebalanceSummaryResult.getSegmentInfo());
     
assertEquals(rebalanceSummaryResult.getSegmentInfo().getTotalSegmentsToBeMoved(),
 0);
+    
assertNull(rebalanceSummaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary());
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getValueBeforeRebalance(),
 3);
     
assertEquals(rebalanceSummaryResult.getServerInfo().getNumServers().getExpectedValueAfterRebalance(),
 3);
     assertNotNull(rebalanceSummaryResult.getTagsInfo());
@@ -728,8 +735,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 0.5);
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
-        _helixResourceManager.getTableSizeReader());
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
     TableConfig tableConfig =
         new 
TableConfigBuilder(TableType.OFFLINE).setTableName(RAW_TABLE_NAME).setNumReplicas(NUM_REPLICAS).build();
 
@@ -828,8 +835,8 @@ public class TableRebalancerClusterStatelessTest extends 
ControllerTest {
     ExecutorService executorService = Executors.newFixedThreadPool(10);
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     preChecker.init(_helixResourceManager, executorService, 0.5);
-    TableRebalancer tableRebalancer = new TableRebalancer(_helixManager, null, 
null, preChecker,
-        _helixResourceManager.getTableSizeReader());
+    TableRebalancer tableRebalancer =
+        new TableRebalancer(_helixManager, null, null, preChecker, 
_helixResourceManager.getTableSizeReader());
     TableConfig tableConfig =
         new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
             .setNumReplicas(2)
@@ -1656,6 +1663,246 @@ public class TableRebalancerClusterStatelessTest 
extends ControllerTest {
     }
   }
 
+  @Test
+  public void testRebalanceConsumingSegmentSummary()
+      throws Exception {
+    int numServers = 3;
+    int numReplica = 3;
+
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = "consumingSegmentSummary_" + 
SERVER_INSTANCE_ID_PREFIX + i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+    }
+
+    ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = 
Mockito.mock(ConsumingSegmentInfoReader.class);
+    TableRebalancer tableRebalancerOriginal =
+        new TableRebalancer(_helixManager, null, null, null, 
_helixResourceManager.getTableSizeReader());
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            .setNumReplicas(numReplica)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            .build();
+
+    // Create the table
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Generate mock ConsumingSegmentsInfoMap for the consuming segments
+    int mockOffsetSmall = 1000;
+    int mockOffsetBig = 2000;
+
+    TableRebalancer tableRebalancer = Mockito.spy(tableRebalancerOriginal);
+    Mockito.doReturn(new LongMsgOffset(mockOffsetBig))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+    Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x != 
0));
+
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+
+    // dry-run with default rebalance config
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    RebalanceSummaryResult summaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult.getSegmentInfo());
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+    assertNotNull(consumingSegmentToBeMovedSummary);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
 0);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 0);
+    
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
 0);
+    
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
 0);
+    assertEquals(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .size(), 0);
+    assertTrue(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== 0));
+
+    rebalanceConfig.setIncludeConsuming(true);
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    summaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult.getSegmentInfo());
+    consumingSegmentToBeMovedSummary = 
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+    assertNotNull(consumingSegmentToBeMovedSummary);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
 0);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 0);
+    
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().size(),
 0);
+    
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
 0);
+    assertEquals(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .size(), 0);
+    assertTrue(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== 0));
+
+    // Create new servers to replace the old servers
+    for (int i = numServers; i < numServers * 2; i++) {
+      String instanceId = "consumingSegmentSummary_" + 
SERVER_INSTANCE_ID_PREFIX + i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+    }
+    for (int i = 0; i < numServers; i++) {
+      _helixAdmin.removeInstanceTag(getHelixClusterName(), 
"consumingSegmentSummary_" + SERVER_INSTANCE_ID_PREFIX + i,
+          TagNameUtils.getRealtimeTagForTenant(null));
+    }
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    summaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult.getSegmentInfo());
+    consumingSegmentToBeMovedSummary = 
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+    assertNotNull(consumingSegmentToBeMovedSummary);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+        FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 numServers);
+    Iterator<Integer> offsetToCatchUpIterator =
+        
consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp().values().iterator();
+    assertEquals(offsetToCatchUpIterator.next(), mockOffsetBig);
+    if (FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS > 1) {
+      assertEquals(offsetToCatchUpIterator.next(), mockOffsetSmall);
+    }
+    
assertEquals(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes().size(),
+        FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS);
+    assertEquals(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .size(), numServers);
+    assertTrue(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments()
+            == mockOffsetSmall * (FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS 
- 1) + mockOffsetBig));
+
+    _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+
+    for (int i = 0; i < numServers * 2; i++) {
+      stopAndDropFakeInstance("consumingSegmentSummary_" + 
SERVER_INSTANCE_ID_PREFIX + i);
+    }
+  }
+
+  @Test
+  public void testRebalanceConsumingSegmentSummaryFailure()
+      throws Exception {
+    int numServers = 3;
+    int numReplica = 3;
+
+    for (int i = 0; i < numServers; i++) {
+      String instanceId = "consumingSegmentSummaryFailure_" + 
SERVER_INSTANCE_ID_PREFIX + i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+    }
+
+    ConsumingSegmentInfoReader mockConsumingSegmentInfoReader = 
Mockito.mock(ConsumingSegmentInfoReader.class);
+    TableRebalancer tableRebalancerOriginal =
+        new TableRebalancer(_helixManager, null, null, null, 
_helixResourceManager.getTableSizeReader());
+    TableConfig tableConfig =
+        new TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME)
+            .setNumReplicas(numReplica)
+            
.setStreamConfigs(FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap())
+            .build();
+
+    // Create the table
+    addDummySchema(RAW_TABLE_NAME);
+    _helixResourceManager.addTable(tableConfig);
+
+    // Generate mock ConsumingSegmentsInfoMap for the consuming segments
+    int mockOffsetSmall = 1000;
+    int mockOffsetBig = 2000;
+
+    TableRebalancer tableRebalancer = Mockito.spy(tableRebalancerOriginal);
+    Mockito.doReturn(new LongMsgOffset(mockOffsetBig))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+    Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x != 
0));
+
+    RebalanceConfig rebalanceConfig = new RebalanceConfig();
+    rebalanceConfig.setDryRun(true);
+    rebalanceConfig.setIncludeConsuming(true);
+
+    // Create new servers to replace the old servers
+    for (int i = numServers; i < numServers * 2; i++) {
+      String instanceId = "consumingSegmentSummaryFailure_" + 
SERVER_INSTANCE_ID_PREFIX + i;
+      addFakeServerInstanceToAutoJoinHelixCluster(instanceId, true);
+    }
+    for (int i = 0; i < numServers; i++) {
+      _helixAdmin.removeInstanceTag(getHelixClusterName(),
+          "consumingSegmentSummaryFailure_" + SERVER_INSTANCE_ID_PREFIX + i,
+          TagNameUtils.getRealtimeTagForTenant(null));
+    }
+
+    RebalanceResult rebalanceResult = tableRebalancer.rebalance(tableConfig, 
rebalanceConfig, null);
+    RebalanceSummaryResult summaryResult = 
rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult.getSegmentInfo());
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+    assertNotNull(consumingSegmentToBeMovedSummary);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 numServers);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+        FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+    
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+    
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+    
assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+
+    // Simulate not supported stream partition message type (e.g. Kinesis)
+    Mockito.doReturn((StreamPartitionMsgOffset) o -> 0)
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+    Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x != 
0));
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    summaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult.getSegmentInfo());
+    consumingSegmentToBeMovedSummary = 
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+    assertNotNull(consumingSegmentToBeMovedSummary);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 numServers);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+        FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+    
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+    
assertNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+    
assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+    
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== -1));
+
+    // Simulate stream partition offset fetch failure
+    Mockito.doThrow(new TimeoutException("timeout"))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.eq(0));
+    Mockito.doReturn(new LongMsgOffset(mockOffsetSmall))
+        .when(tableRebalancer)
+        .fetchStreamPartitionOffset(Mockito.any(), Mockito.intThat(x -> x != 
0));
+
+    rebalanceResult = tableRebalancer.rebalance(tableConfig, rebalanceConfig, 
null);
+    summaryResult = rebalanceResult.getRebalanceSummaryResult();
+    assertNotNull(summaryResult.getSegmentInfo());
+    consumingSegmentToBeMovedSummary = 
summaryResult.getSegmentInfo().getConsumingSegmentToBeMovedSummary();
+    assertNotNull(consumingSegmentToBeMovedSummary);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 numServers);
+    
assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
+        FakeStreamConfigUtils.DEFAULT_NUM_PARTITIONS * numReplica);
+    
assertNotNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithOldestAgeInMinutes());
+    
assertNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+    
assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+    
assertTrue(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== -1));
+
+    _helixResourceManager.deleteRealtimeTable(RAW_TABLE_NAME);
+
+    for (int i = 0; i < numServers * 2; i++) {
+      stopAndDropFakeInstance("consumingSegmentSummaryFailure_" + 
SERVER_INSTANCE_ID_PREFIX + i);
+    }
+  }
+
   @AfterClass
   public void tearDown() {
     stopFakeInstances();
diff --git 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
index 7028844169..d4b8cbfa48 100644
--- 
a/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
+++ 
b/pinot-core/src/test/java/org/apache/pinot/core/realtime/impl/fakestream/FakeStreamConfigUtils.java
@@ -56,7 +56,7 @@ public class FakeStreamConfigUtils {
   private static final LongMsgOffset SMALLEST_OFFSET = new LongMsgOffset(0);
   private static final LongMsgOffset LARGEST_OFFSET = new 
LongMsgOffset(Integer.MAX_VALUE);
   private static final String NUM_PARTITIONS_KEY = "num.partitions";
-  private static final int DEFAULT_NUM_PARTITIONS = 2;
+  public static final int DEFAULT_NUM_PARTITIONS = 2;
 
   private static final String STREAM_TYPE = "fakeStream";
   private static final String TOPIC_NAME = "fakeTopic";
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
new file mode 100644
index 0000000000..16999c887c
--- /dev/null
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/KafkaConsumingSegmentToBeMovedSummaryIntegrationTest.java
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.integration.tests;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.model.HelixConfigScope;
+import org.apache.helix.model.builder.HelixConfigScopeBuilder;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
+import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
+import org.apache.pinot.spi.config.table.IndexingConfig;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.data.Schema;
+import org.apache.pinot.spi.stream.StreamConfigProperties;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.JsonUtils;
+import org.apache.pinot.util.TestUtils;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class KafkaConsumingSegmentToBeMovedSummaryIntegrationTest extends 
BaseRealtimeClusterIntegrationTest {
+
+  @Override
+  @BeforeClass
+  public void setUp()
+      throws Exception {
+    TestUtils.ensureDirectoriesExistAndEmpty(_tempDir);
+
+    // Start the Pinot cluster
+    startZk();
+    startController();
+
+    HelixConfigScope scope =
+        new 
HelixConfigScopeBuilder(HelixConfigScope.ConfigScopeProperty.CLUSTER).forCluster(getHelixClusterName())
+            .build();
+    // Set max segment preprocess parallelism to 8
+    _helixManager.getConfigAccessor()
+        .set(scope, 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM, 
Integer.toString(8));
+    // Set max segment startree preprocess parallelism to 6
+    _helixManager.getConfigAccessor()
+        .set(scope, 
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM, 
Integer.toString(6));
+
+    startBroker();
+    startServer();
+
+    // Start Kafka
+    startKafka();
+
+    // Unpack the Avro files
+    List<File> avroFiles = unpackAvroData(_tempDir);
+
+    // Create and upload the schema and table config
+    Schema schema = createSchema();
+    addSchema(schema);
+    TableConfig tableConfig = createRealtimeTableConfig(avroFiles.get(0));
+    IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
+    Map<String, String> streamConfig = getStreamConfigs();
+    
streamConfig.put(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_SEGMENT_SIZE, 
"1000000");
+    streamConfig.remove(StreamConfigProperties.SEGMENT_FLUSH_THRESHOLD_ROWS);
+    indexingConfig.setStreamConfigs(streamConfig);
+    tableConfig.setIndexingConfig(indexingConfig);
+    addTableConfig(tableConfig);
+
+    // Push data into Kafka
+    pushAvroIntoKafka(avroFiles);
+
+    // create segments and upload them to controller
+    createSegmentsAndUpload(avroFiles, schema, tableConfig);
+
+    // Set up the H2 connection
+    setUpH2Connection(avroFiles);
+
+    // Initialize the query generator
+    setUpQueryGenerator(avroFiles);
+
+    runValidationJob(600_000);
+
+    // Wait for all documents loaded
+    waitForAllDocsLoaded(600_000L);
+  }
+
+  @Test
+  public void testConsumingSegmentSummary()
+      throws Exception {
+    String response = sendPostRequest(
+        getControllerRequestURLBuilder().forTableRebalance(getTableName(), 
"REALTIME", true, false, true, false, -1));
+    RebalanceResult result = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+    Assert.assertNotNull(result);
+    Assert.assertNotNull(result.getRebalanceSummaryResult());
+    Assert.assertNotNull(result.getRebalanceSummaryResult().getSegmentInfo());
+    RebalanceSummaryResult.SegmentInfo segmentInfo = 
result.getRebalanceSummaryResult().getSegmentInfo();
+    RebalanceSummaryResult.ConsumingSegmentToBeMovedSummary 
consumingSegmentToBeMovedSummary =
+        segmentInfo.getConsumingSegmentToBeMovedSummary();
+    Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
 0);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 0);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
+        0);
+
+    startServer();
+    response = sendPostRequest(
+        getControllerRequestURLBuilder().forTableRebalance(getTableName(), 
"REALTIME", true, false, true, false, -1));
+    result = JsonUtils.stringToObject(response, RebalanceResult.class);
+    Assert.assertNotNull(result);
+    Assert.assertNotNull(result.getRebalanceSummaryResult());
+    Assert.assertNotNull(result.getRebalanceSummaryResult().getSegmentInfo());
+    segmentInfo = result.getRebalanceSummaryResult().getSegmentInfo();
+    consumingSegmentToBeMovedSummary = 
segmentInfo.getConsumingSegmentToBeMovedSummary();
+    Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
 1);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 1);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
+        1);
+    Assert.assertTrue(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .allMatch(x -> x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() 
== 57801
+            || x.getTotalOffsetsToCatchUpAcrossAllConsumingSegments() == 0));
+    Assert.assertEquals(consumingSegmentToBeMovedSummary
+        .getServerConsumingSegmentSummary()
+        .values()
+        .stream()
+        .reduce(0, (a, b) -> a + 
b.getTotalOffsetsToCatchUpAcrossAllConsumingSegments(), Integer::sum), 57801);
+
+    // set includeConsuming to false
+    response = sendPostRequest(
+        getControllerRequestURLBuilder().forTableRebalance(getTableName(), 
"REALTIME", true, false, false, false, -1));
+    result = JsonUtils.stringToObject(response, RebalanceResult.class);
+    Assert.assertNotNull(result);
+    Assert.assertNotNull(result.getRebalanceSummaryResult());
+    Assert.assertNotNull(result.getRebalanceSummaryResult().getSegmentInfo());
+    segmentInfo = result.getRebalanceSummaryResult().getSegmentInfo();
+    consumingSegmentToBeMovedSummary = 
segmentInfo.getConsumingSegmentToBeMovedSummary();
+    Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
 0);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 0);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary().size(),
+        0);
+
+    stopKafka();
+    response = sendPostRequest(
+        getControllerRequestURLBuilder().forTableRebalance(getTableName(), 
"REALTIME", true, false, true, false, -1));
+    RebalanceResult resultNoInfo = JsonUtils.stringToObject(response, 
RebalanceResult.class);
+    Assert.assertNotNull(resultNoInfo);
+    Assert.assertNotNull(resultNoInfo.getRebalanceSummaryResult());
+    
Assert.assertNotNull(resultNoInfo.getRebalanceSummaryResult().getSegmentInfo());
+    segmentInfo = resultNoInfo.getRebalanceSummaryResult().getSegmentInfo();
+    consumingSegmentToBeMovedSummary = 
segmentInfo.getConsumingSegmentToBeMovedSummary();
+    Assert.assertNotNull(consumingSegmentToBeMovedSummary);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumConsumingSegmentsToBeMoved(),
 1);
+    
Assert.assertEquals(consumingSegmentToBeMovedSummary.getNumServersGettingConsumingSegmentsAdded(),
 1);
+    
Assert.assertNotNull(consumingSegmentToBeMovedSummary.getServerConsumingSegmentSummary());
+    
Assert.assertNull(consumingSegmentToBeMovedSummary.getConsumingSegmentsToBeMovedWithMostOffsetsToCatchUp());
+  }
+}
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index 7332b9cce4..b4e73989a7 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -75,6 +75,7 @@ import 
org.apache.pinot.controller.helix.core.rebalance.RebalancePreCheckerResul
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceResult;
 import org.apache.pinot.controller.helix.core.rebalance.RebalanceSummaryResult;
 import org.apache.pinot.controller.helix.core.rebalance.TableRebalancer;
+import org.apache.pinot.controller.util.ConsumingSegmentInfoReader;
 import org.apache.pinot.core.operator.query.NonScanBasedAggregationOperator;
 import org.apache.pinot.segment.spi.index.ForwardIndexConfig;
 import org.apache.pinot.segment.spi.index.StandardIndexes;
@@ -298,6 +299,8 @@ public class OfflineClusterIntegrationTest extends 
BaseClusterIntegrationTestSet
     DefaultRebalancePreChecker preChecker = new DefaultRebalancePreChecker();
     _executorService = Executors.newFixedThreadPool(10);
     preChecker.init(_helixResourceManager, _executorService, 
_controllerConfig.getDiskUtilizationThreshold());
+    ConsumingSegmentInfoReader consumingSegmentInfoReader =
+        new ConsumingSegmentInfoReader(_executorService, null, 
_helixResourceManager);
     _tableRebalancer = new 
TableRebalancer(_resourceManager.getHelixZkManager(), null, null, preChecker,
         _resourceManager.getTableSizeReader());
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to