This is an automated email from the ASF dual-hosted git repository.

xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 7eec9a78717 [Refactor] Introduce StreamMetadata to group partition 
metadata by stream (#17811)
7eec9a78717 is described below

commit 7eec9a78717315235d12c1082f758574e7979e2e
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Mar 6 18:37:39 2026 -0800

    [Refactor] Introduce StreamMetadata to group partition metadata by stream 
(#17811)
    
    * Introduce StreamMetadata to group partition metadata by stream
    
    Replace the flat List<PartitionGroupMetadata> pattern with
    List<StreamMetadata>, where each StreamMetadata groups partition
    metadata for a single stream along with its StreamConfig and index.
    This makes stream membership explicit and eliminates the need for
    callers to decode partition IDs to determine stream ownership.
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    * [Refactor] Introduce StreamMetadata to group partition metadata by stream
    
    Introduce StreamMetadata to replace the flat List<PartitionGroupMetadata>
    pattern where partitions from all streams were mixed together and required
    partition ID padding (streamIndex * 10000 + streamPartitionId) to identify
    stream membership.
    
    Key changes:
    - New StreamMetadata class grouping PartitionGroupMetadata per stream with
      StreamConfig and numPartitions (total partition count from broker)
    - PartitionGroupMetadataFetcher now produces List<StreamMetadata> with a
      deprecated backward-compat getPartitionGroupMetadataList() method
    - setUpNewTable and addTable APIs take List<StreamMetadata> directly
    - PartitionGroupMetadata gains a sequenceNumber field (default -1 = unset)
      to replace the Pair<PartitionGroupMetadata, Integer> pattern
    - fetchPartitionCount() called after computePartitionGroupMetadata() to
      avoid duplicate metadata RPCs for Kafka-like implementations
    - Copy table flow constructs StreamMetadata from watermarks with real
      partition counts via getPartitionCountMap()
    - Callers using sequence number validate >= 0 before use
    
    Co-Authored-By: Claude Opus 4.6 <[email protected]>
    
    ---------
    
    Co-authored-by: Claude Opus 4.6 <[email protected]>
---
 .../api/resources/PinotTableRestletResource.java   |  49 +++-
 .../helix/core/PinotHelixResourceManager.java      |  21 +-
 .../helix/core/PinotTableIdealStateBuilder.java    |  13 +-
 .../realtime/MissingConsumingSegmentFinder.java    |   9 +-
 .../realtime/PinotLLCRealtimeSegmentManager.java   | 142 ++++++-----
 .../resources/PinotTableRestletResourceTest.java   |  54 ++++
 .../PinotHelixResourceManagerStatelessTest.java    |  24 +-
 .../PinotLLCRealtimeSegmentManagerTest.java        | 155 ++++++++---
 .../pinot/spi/stream/PartitionGroupMetadata.java   |  15 +-
 .../spi/stream/PartitionGroupMetadataFetcher.java  |  60 +++--
 .../apache/pinot/spi/stream/StreamMetadata.java    |  66 +++++
 .../stream/PartitionGroupMetadataFetcherTest.java  | 284 ++++++++++++++++++++-
 .../pinot/spi/stream/StreamMetadataTest.java       |  98 +++++++
 13 files changed, 818 insertions(+), 172 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 498f0349beb..578a295efcd 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -129,8 +129,11 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
 import org.apache.pinot.spi.utils.JsonUtils;
 import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -351,17 +354,13 @@ public class PinotTableRestletResource {
         return new CopyTableResponse("success", "Dry run", schema, 
realtimeTableConfig, watermarkInductionResult);
       }
 
-      List<Pair<PartitionGroupMetadata, Integer>> partitionGroupInfos = 
watermarkInductionResult.getWatermarks()
-          .stream()
-          .map(watermark -> Pair.of(
-              new PartitionGroupMetadata(watermark.getPartitionGroupId(), new 
LongMsgOffset(watermark.getOffset())),
-              watermark.getSequenceNumber()))
-          .collect(Collectors.toList());
+      List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(realtimeTableConfig);
+      List<StreamMetadata> streamMetadataList = 
getStreamMetadataList(streamConfigs, watermarkInductionResult);
 
       _pinotHelixResourceManager.addSchema(schema, true, false);
       LOGGER.info("[copyTable] Successfully added schema for table: {}", 
tableName);
       // Add the table with designated starting kafka offset and segment 
sequence number to create consuming segments
-      _pinotHelixResourceManager.addTable(realtimeTableConfig, 
partitionGroupInfos);
+      _pinotHelixResourceManager.addTable(realtimeTableConfig, 
streamMetadataList);
       LOGGER.info("[copyTable] Successfully added table config: {} with 
designated high watermark", tableName);
       CopyTableResponse response = new CopyTableResponse("success", "Table 
copied successfully", null, null, null);
       if (hasOffline) {
@@ -381,6 +380,42 @@ public class PinotTableRestletResource {
     }
   }
 
+  @VisibleForTesting
+  List<StreamMetadata> getStreamMetadataList(List<StreamConfig> streamConfigs,
+      WatermarkInductionResult watermarkInductionResult)
+      throws Exception {
+    Map<Integer, Integer> streamPartitionCountMap =
+        
_pinotHelixResourceManager.getRealtimeSegmentManager().getPartitionCountMap(streamConfigs);
+    Map<Integer, List<PartitionGroupMetadata>> 
partitionGroupMetadataByStreamConfigIndex = new HashMap<>();
+    for (WatermarkInductionResult.Watermark watermark : 
watermarkInductionResult.getWatermarks()) {
+      int streamConfigIndex =
+          
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId());
+      Preconditions.checkArgument(streamConfigIndex >= 0 && streamConfigIndex 
< streamConfigs.size(),
+          "Invalid stream config index %s from watermark partition ID %s. 
Expected index in range [0, %s)",
+          streamConfigIndex, watermark.getPartitionGroupId(), 
streamConfigs.size());
+      
partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex, 
ignored -> new ArrayList<>()).add(
+          new PartitionGroupMetadata(watermark.getPartitionGroupId(), new 
LongMsgOffset(watermark.getOffset()),
+              watermark.getSequenceNumber()));
+    }
+
+    // Iterate in order by streamConfigIndex to ensure deterministic ordering
+    List<StreamMetadata> streamMetadataList = new 
ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size());
+    for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size(); 
streamConfigIndex++) {
+      List<PartitionGroupMetadata> partitionGroupMetadataList =
+          partitionGroupMetadataByStreamConfigIndex.get(streamConfigIndex);
+      if (partitionGroupMetadataList == null) {
+        // No watermarks for this stream config index, skip it
+        continue;
+      }
+      Integer partitionCount = streamPartitionCountMap.get(streamConfigIndex);
+      Preconditions.checkState(partitionCount != null,
+          "Cannot find partition count for stream config index: %s", 
streamConfigIndex);
+      streamMetadataList.add(new 
StreamMetadata(streamConfigs.get(streamConfigIndex),
+          partitionCount, partitionGroupMetadataList));
+    }
+    return streamMetadataList;
+  }
+
   /**
    * Helper method to tweak the realtime table config. This method is used to 
set the broker and server tenants, and
    * optionally replace the pool tags in the instance assignment config.
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8ae187ab3a6..3bf66d81e65 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -180,8 +180,8 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
 import org.apache.pinot.spi.data.PhysicalTableConfig;
 import org.apache.pinot.spi.data.Schema;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import 
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -1808,17 +1808,17 @@ public class PinotHelixResourceManager {
    * designated offset and being assigned with a segment sequence number per 
partition. Otherwise, you should
    * directly call the {@link #addTable(TableConfig)} which will further call 
this api with an empty list.
    * @param tableConfig The config for the table to be created.
-   * @param consumeMeta A list of pairs, where each pair contains the 
partition group metadata and the initial sequence
-   *                    number for a consuming segment. This is used to start 
ingestion from a specific offset.
+   * @param streamMetadataList A list of {@link StreamMetadata}, each 
containing partition group metadata with
+   *                           sequence numbers. This is used to start 
ingestion from a specific offset.
    * @throws InvalidTableConfigException if validations fail
    * @throws TableAlreadyExistsException if the table already exists
    */
-  public void addTable(TableConfig tableConfig, 
List<Pair<PartitionGroupMetadata, Integer>> consumeMeta)
+  public void addTable(TableConfig tableConfig, List<StreamMetadata> 
streamMetadataList)
       throws IOException {
     String tableNameWithType = tableConfig.getTableName();
     LOGGER.info("Adding table {}: Start", tableNameWithType);
-    if (consumeMeta != null && !consumeMeta.isEmpty()) {
-      LOGGER.info("Adding table {} with {} partition group infos", 
tableNameWithType, consumeMeta.size());
+    if (streamMetadataList != null && !streamMetadataList.isEmpty()) {
+      LOGGER.info("Adding table {} with {} stream metadata entries", 
tableNameWithType, streamMetadataList.size());
     }
 
     if (getTableConfig(tableNameWithType) != null) {
@@ -1878,15 +1878,14 @@ public class PinotHelixResourceManager {
         // Add ideal state
         _helixAdmin.addResource(_helixClusterName, tableNameWithType, 
idealState);
         LOGGER.info("Adding table {}: Added ideal state for offline table", 
tableNameWithType);
-      } else if (consumeMeta == null || consumeMeta.isEmpty()) {
+      } else if (streamMetadataList == null || streamMetadataList.isEmpty()) {
         // Add ideal state with the first CONSUMING segment
         _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
         LOGGER.info("Adding table {}: Added ideal state with first consuming 
segment", tableNameWithType);
       } else {
-        // Add ideal state with the first CONSUMING segment with designated 
partition consuming metadata
-        // Add ideal state with the first CONSUMING segment
-        _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState, 
consumeMeta);
-        LOGGER.info("Adding table {}: Added consuming segments ideal state 
given the designated consuming metadata",
+        // Add ideal state with consuming segments from designated stream 
metadata
+        _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState, 
streamMetadataList);
+        LOGGER.info("Adding table {}: Added consuming segments ideal state 
given the designated stream metadata",
                 tableNameWithType);
       }
     } catch (Exception e) {
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 6ec48830ca7..fafbabb7ad3 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -19,14 +19,15 @@
 package org.apache.pinot.controller.helix.core;
 
 import java.util.List;
+import java.util.stream.Collectors;
 import org.apache.helix.model.IdealState;
 import org.apache.helix.model.builder.CustomModeISBuilder;
 import org.apache.pinot.common.metrics.ControllerMeter;
 import org.apache.pinot.common.metrics.ControllerMetrics;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
 import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.utils.retry.RetryPolicies;
 import org.apache.pinot.spi.utils.retry.RetryPolicy;
 import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class PinotTableIdealStateBuilder {
   }
 
   /**
-   * Fetches the list of {@link PartitionGroupMetadata} for the new partition 
groups for the stream,
+   * Fetches the list of {@link StreamMetadata} for all streams of the table,
    * with the help of the {@link PartitionGroupConsumptionStatus} of the 
current partitionGroups.
    * In particular, this method can also be used to fetch from multiple stream 
topics.
    *
@@ -90,19 +91,19 @@ public class PinotTableIdealStateBuilder {
    * @param pausedTopicIndices List of inactive topic indices. Index is the 
index of the topic in the streamConfigMaps.
    * @param forceGetOffsetFromStream - details in 
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
    */
-  public static List<PartitionGroupMetadata> 
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+  public static List<StreamMetadata> getStreamMetadataList(List<StreamConfig> 
streamConfigs,
       List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
       boolean forceGetOffsetFromStream) {
     PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new 
PartitionGroupMetadataFetcher(
         streamConfigs, partitionGroupConsumptionStatusList, 
pausedTopicIndices, forceGetOffsetFromStream);
     try {
       
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
-      return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+      return partitionGroupMetadataFetcher.getStreamMetadataList();
     } catch (Exception e) {
       Exception fetcherException = 
partitionGroupMetadataFetcher.getException();
       String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
-      LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of 
table: {}",
-          streamConfigs.stream().map(streamConfig -> 
streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
+      LOGGER.error("Could not get StreamMetadata for topic: {} of table: {}",
+          
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.joining(",")),
           tableNameWithType, fetcherException);
       ControllerMetrics controllerMetrics = ControllerMetrics.get();
       controllerMetrics.addMeteredTableValue(tableNameWithType, 
ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR,
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index 99bee6f8a7f..6be597caa4f 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -41,6 +41,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
 import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
 import org.apache.pinot.spi.config.table.PauseState;
 import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
 import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -84,10 +85,12 @@ public class MissingConsumingSegmentFinder {
     });
     try {
       PauseState pauseState = 
PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState);
-      PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(),
+      PinotTableIdealStateBuilder.getStreamMetadataList(streamConfigs, 
Collections.emptyList(),
               pauseState == null ? new ArrayList<>() : 
pauseState.getIndexOfInactiveTopics(), false)
-          .forEach(metadata -> {
-            
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+          .forEach(streamMetadata -> {
+            for (PartitionGroupMetadata metadata : 
streamMetadata.getPartitionGroupMetadataList()) {
+              
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+            }
           });
     } catch (Exception e) {
       LOGGER.warn("Problem encountered in fetching stream metadata for topics: 
{} of table: {}. "
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 120ab8841bd..6720161c620 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -57,7 +57,6 @@ import java.util.stream.Stream;
 import javax.annotation.Nullable;
 import org.apache.commons.collections4.CollectionUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.AccessOption;
 import org.apache.helix.ClusterMessagingService;
 import org.apache.helix.HelixAdmin;
@@ -128,6 +127,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
@@ -381,11 +381,9 @@ public class PinotLLCRealtimeSegmentManager {
    */
   public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
     List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
-    List<Pair<PartitionGroupMetadata, Integer>> newPartitionGroupMetadataList =
-        getNewPartitionGroupMetadataList(streamConfigs, 
Collections.emptyList(), idealState).stream().map(
-            x -> Pair.of(x, STARTING_SEQUENCE_NUMBER)
-        ).collect(Collectors.toList());
-    setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
+    List<StreamMetadata> streamMetadataList =
+        getNewStreamMetadataList(streamConfigs, Collections.emptyList(), 
idealState);
+    setUpNewTable(tableConfig, idealState, streamMetadataList);
   }
 
   /**
@@ -393,16 +391,18 @@ public class PinotLLCRealtimeSegmentManager {
    * <p>NOTE: the passed in IdealState may contain HLC segments if both HLC 
and LLC are configured.
    */
   public void setUpNewTable(TableConfig tableConfig, IdealState idealState,
-      List<Pair<PartitionGroupMetadata, Integer>> consumeMeta) {
+      List<StreamMetadata> streamMetadataList) {
     Preconditions.checkState(!_isStopping, "Segment manager is stopping");
 
     String realtimeTableName = tableConfig.getTableName();
     LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
 
-    List<StreamConfig> streamConfigs = 
IngestionConfigUtils.getStreamConfigs(tableConfig);
-    
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
+    int numPartitionGroups = 0;
+    for (StreamMetadata streamMetadata : streamMetadataList) {
+      
_flushThresholdUpdateManager.clearFlushThresholdUpdater(streamMetadata.getStreamConfig());
+      numPartitionGroups += streamMetadata.getNumPartitions();
+    }
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
-    int numPartitionGroups = consumeMeta.size();
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
 
     SegmentAssignment segmentAssignment =
@@ -412,16 +412,17 @@ public class PinotLLCRealtimeSegmentManager {
 
     long currentTimeMs = getCurrentTimeMs();
     Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
-    for (Pair<PartitionGroupMetadata, Integer> pair : consumeMeta) {
-      PartitionGroupMetadata metadata = pair.getLeft();
-      int sequence = pair.getRight();
-      StreamConfig streamConfig = 
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
-          metadata.getPartitionGroupId());
-      String segmentName =
-          setupNewPartitionGroup(tableConfig, streamConfig, metadata, 
sequence, currentTimeMs, instancePartitions,
-              numPartitionGroups, numReplicas);
-      updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
segmentName, segmentAssignment,
-          instancePartitionsMap);
+    for (StreamMetadata streamMetadata : streamMetadataList) {
+      StreamConfig streamConfig = streamMetadata.getStreamConfig();
+      for (PartitionGroupMetadata metadata : 
streamMetadata.getPartitionGroupMetadataList()) {
+        int sequenceNumber = metadata.getSequenceNumber() >= 0
+            ? metadata.getSequenceNumber() : STARTING_SEQUENCE_NUMBER;
+        String segmentName =
+            setupNewPartitionGroup(tableConfig, streamConfig, metadata, 
sequenceNumber, currentTimeMs,
+                instancePartitions, numPartitionGroups, numReplicas);
+        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
segmentName, segmentAssignment,
+            instancePartitionsMap);
+      }
     }
 
     setIdealState(realtimeTableName, idealState);
@@ -1177,6 +1178,20 @@ public class PinotLLCRealtimeSegmentManager {
     }
   }
 
+  public Map<Integer, Integer> getPartitionCountMap(List<StreamConfig> 
streamConfigs)
+      throws Exception {
+    Map<Integer, Integer> streamPartitionCountMap = new HashMap<>();
+    for (int i = 0; i < streamConfigs.size(); i++) {
+      StreamConfig streamConfig = streamConfigs.get(i);
+      String clientId = getTableTopicUniqueClientId(streamConfig);
+      StreamConsumerFactory consumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
+      try (StreamMetadataProvider metadataProvider = 
consumerFactory.createStreamMetadataProvider(clientId)) {
+        streamPartitionCountMap.put(i, 
metadataProvider.fetchPartitionCount(STREAM_FETCH_TIMEOUT_MS));
+      }
+    }
+    return streamPartitionCountMap;
+  }
+
   @VisibleForTesting
   Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState 
idealState) {
     return getPartitionIdsWithIdealState(streamConfigs, () -> 
idealState)._partitionIds;
@@ -1242,39 +1257,40 @@ public class PinotLLCRealtimeSegmentManager {
       //       We don't need to read partition group metadata for other 
partition groups.
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList =
           getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
-      List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-          getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
-      partitionIds.addAll(newPartitionGroupMetadataList.stream()
-          .map(PartitionGroupMetadata::getPartitionGroupId)
-          .collect(Collectors.toSet()));
+      List<StreamMetadata> streamMetadataList =
+          getNewStreamMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
+      for (StreamMetadata streamMetadata : streamMetadataList) {
+        for (PartitionGroupMetadata partitionGroupMetadata : 
streamMetadata.getPartitionGroupMetadataList()) {
+          partitionIds.add(partitionGroupMetadata.getPartitionGroupId());
+        }
+      }
       return new PartitionIdsWithIdealState(partitionIds, idealState);
     }
     return new PartitionIdsWithIdealState(partitionIds, null);
   }
 
   /**
-   * Fetches the latest state of the PartitionGroups for the stream
+   * Fetches the latest state of the partition groups for all streams of the 
table.
    * If any partition has reached end of life, and all messages of that 
partition have been consumed by the segment,
    * it will be skipped from the result
    */
   @VisibleForTesting
-  List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+  List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig> 
streamConfigs,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
-    return getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState,
-        false);
+    return getNewStreamMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState, false);
   }
 
   /**
-   * Fetches the latest state of the PartitionGroups for the stream
+   * Fetches the latest state of the partition groups for all streams of the 
table.
    * If any partition has reached end of life, and all messages of that 
partition have been consumed by the segment,
    * it will be skipped from the result
    */
   @VisibleForTesting
-  List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+  List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig> 
streamConfigs,
       List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState,
       boolean forceGetOffsetFromStream) {
     PauseState pauseState = extractTablePauseState(idealState);
-    return 
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
+    return PinotTableIdealStateBuilder.getStreamMetadataList(streamConfigs,
         currentPartitionGroupConsumptionStatusList,
         pauseState == null ? new ArrayList<>() : 
pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream);
   }
@@ -1448,10 +1464,10 @@ public class PinotLLCRealtimeSegmentManager {
           streamConfigs.stream()
               .forEach(streamConfig -> streamConfig.setOffsetCriteria(
                   offsetsHaveToChange ? offsetCriteria : 
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
-          List<PartitionGroupMetadata> newPartitionGroupMetadataList =
-              getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
+          List<StreamMetadata> streamMetadataList =
+              getNewStreamMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
           streamConfigs.stream().forEach(streamConfig -> 
streamConfig.setOffsetCriteria(originalOffsetCriteria));
-          return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, newPartitionGroupMetadataList,
+          return ensureAllPartitionsConsuming(tableConfig, streamConfigs, 
idealState, streamMetadataList,
               offsetCriteria);
         } else {
           LOGGER.info("Skipping LLC segments validation for table: {}, 
isTableEnabled: {}, isTablePaused: {}",
@@ -1488,9 +1504,9 @@ public class PinotLLCRealtimeSegmentManager {
         throw new HelixHelper.PermanentUpdaterException(
             "Exceeded max segment completion time for segment " + 
committingSegmentName);
       }
-    
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
-        isTablePaused(idealState) || isTopicPaused(idealState, 
committingSegmentName), newSegmentName,
-        segmentAssignment, instancePartitionsMap);
+      
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
 committingSegmentName,
+          isTablePaused(idealState) || isTopicPaused(idealState, 
committingSegmentName), newSegmentName,
+          segmentAssignment, instancePartitionsMap);
       return idealState;
     };
     if (_controllerConf.getSegmentCompletionGroupCommitEnabled()) {
@@ -1706,12 +1722,15 @@ public class PinotLLCRealtimeSegmentManager {
    */
   @VisibleForTesting
   IdealState ensureAllPartitionsConsuming(TableConfig tableConfig, 
List<StreamConfig> streamConfigs,
-      IdealState idealState, List<PartitionGroupMetadata> 
partitionGroupMetadataList, OffsetCriteria offsetCriteria) {
+      IdealState idealState, List<StreamMetadata> streamMetadataList, 
OffsetCriteria offsetCriteria) {
     String realtimeTableName = tableConfig.getTableName();
 
     InstancePartitions instancePartitions = 
getConsumingInstancePartitions(tableConfig);
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-    int numPartitions = partitionGroupMetadataList.size();
+    int numPartitions = 0;
+    for (StreamMetadata streamMetadata : streamMetadataList) {
+      numPartitions += streamMetadata.getNumPartitions();
+    }
 
     SegmentAssignment segmentAssignment =
         SegmentAssignmentFactory.getSegmentAssignment(_helixManager, 
tableConfig, _controllerMetrics);
@@ -1728,8 +1747,10 @@ public class PinotLLCRealtimeSegmentManager {
     // Create a map from partition id to start offset
     // TODO: Directly return map from StreamMetadataProvider
     Map<Integer, StreamPartitionMsgOffset> partitionIdToStartOffset = 
Maps.newHashMapWithExpectedSize(numPartitions);
-    for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
-      partitionIdToStartOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+    for (StreamMetadata streamMetadata : streamMetadataList) {
+      for (PartitionGroupMetadata metadata : 
streamMetadata.getPartitionGroupMetadataList()) {
+        partitionIdToStartOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+      }
     }
     // Create a map from partition id to the smallest stream offset
     Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null;
@@ -1867,7 +1888,7 @@ public class PinotLLCRealtimeSegmentManager {
                     tableConfig.getTableName(), offsetFactory,
                     latestSegmentZKMetadata.getStartOffset()); // segments are 
OFFLINE; start from beginning
             createNewConsumingSegment(tableConfig, 
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
-                currentTimeMs, partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+                currentTimeMs, numPartitions, instancePartitions, 
instanceStatesMap, segmentAssignment,
                 instancePartitionsMap, startOffset);
           } else {
             LOGGER.info("Resuming consumption for partition: {} of table: {}", 
partitionId, realtimeTableName);
@@ -1875,7 +1896,7 @@ public class PinotLLCRealtimeSegmentManager {
                 selectStartOffset(offsetCriteria, partitionId, 
partitionIdToStartOffset, partitionIdToSmallestOffset,
                     tableConfig.getTableName(), offsetFactory, 
latestSegmentZKMetadata.getEndOffset());
             createNewConsumingSegment(tableConfig, 
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
-                currentTimeMs, partitionGroupMetadataList, instancePartitions, 
instanceStatesMap, segmentAssignment,
+                currentTimeMs, numPartitions, instancePartitions, 
instanceStatesMap, segmentAssignment,
                 instancePartitionsMap, startOffset);
           }
         }
@@ -1917,15 +1938,16 @@ public class PinotLLCRealtimeSegmentManager {
     }
 
     // Set up new partitions if not exist
-    for (PartitionGroupMetadata partitionGroupMetadata : 
partitionGroupMetadataList) {
-      int partitionId = partitionGroupMetadata.getPartitionGroupId();
-      int streamConfigIdx = 
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
-      if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
-        String newSegmentName =
-            setupNewPartitionGroup(tableConfig, 
streamConfigs.get(streamConfigIdx), partitionGroupMetadata,
-                currentTimeMs, instancePartitions, numPartitions, numReplicas);
-        updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
newSegmentName, segmentAssignment,
-            instancePartitionsMap);
+    for (StreamMetadata streamMetadata : streamMetadataList) {
+      for (PartitionGroupMetadata partitionGroupMetadata : 
streamMetadata.getPartitionGroupMetadataList()) {
+        int partitionId = partitionGroupMetadata.getPartitionGroupId();
+        if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
+          String newSegmentName =
+              setupNewPartitionGroup(tableConfig, 
streamMetadata.getStreamConfig(), partitionGroupMetadata,
+                  currentTimeMs, instancePartitions, numPartitions, 
numReplicas);
+          updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null, 
newSegmentName, segmentAssignment,
+              instancePartitionsMap);
+        }
       }
     }
 
@@ -1934,11 +1956,10 @@ public class PinotLLCRealtimeSegmentManager {
 
   private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig 
streamConfig,
       SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
-      List<PartitionGroupMetadata> newPartitionGroupMetadataList, 
InstancePartitions instancePartitions,
+      int numPartitions, InstancePartitions instancePartitions,
       Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment 
segmentAssignment,
       Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap, 
StreamPartitionMsgOffset startOffset) {
     int numReplicas = getNumReplicas(tableConfig, instancePartitions);
-    int numPartitions = newPartitionGroupMetadataList.size();
     LLCSegmentName latestLLCSegmentName = new 
LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
     LLCSegmentName newLLCSegmentName = 
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
     CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -1969,11 +1990,13 @@ public class PinotLLCRealtimeSegmentManager {
       // Temporarily, we are passing a boolean flag to indicate if we want to 
use the current status
       // The kafka implementation of computePartitionGroupMetadata() will 
ignore the current status
       // while the kinesis implementation will use it.
-      List<PartitionGroupMetadata> partitionGroupMetadataList = 
getNewPartitionGroupMetadataList(
+      List<StreamMetadata> streamMetadataList = getNewStreamMetadataList(
           streamConfigs, currentPartitionGroupConsumptionStatusList, 
idealState, true);
       streamConfig.setOffsetCriteria(originalOffsetCriteria);
-      for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
-        partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+      for (StreamMetadata streamMetadata : streamMetadataList) {
+        for (PartitionGroupMetadata metadata : 
streamMetadata.getPartitionGroupMetadataList()) {
+          partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(), 
metadata.getStartOffset());
+        }
       }
     }
     return partitionGroupIdToSmallestOffset;
@@ -2065,6 +2088,7 @@ public class PinotLLCRealtimeSegmentManager {
   private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig 
streamConfig,
       PartitionGroupMetadata partitionGroupMetadata, int sequence, long 
creationTimeMs,
       InstancePartitions instancePartitions, int numPartitions, int 
numReplicas) {
+    Preconditions.checkArgument(sequence >= 0, "Sequence number must be >= 0, 
got: %s", sequence);
     String realtimeTableName = tableConfig.getTableName();
     int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
     String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -2077,7 +2101,7 @@ public class PinotLLCRealtimeSegmentManager {
     String newSegmentName = newLLCSegmentName.getSegmentName();
 
     CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(null,
-            startOffset, 0);
+        startOffset, 0);
     createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName, 
creationTimeMs,
         committingSegmentDescriptor, null, instancePartitions, numPartitions, 
numReplicas);
 
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
index abbfa71e3ac..dde748ef622 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -20,8 +20,16 @@ package org.apache.pinot.controller.api.resources;
 
 import com.fasterxml.jackson.databind.node.ObjectNode;
 import java.io.InputStream;
+import java.util.List;
 import java.util.Map;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import 
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.Mockito;
 import org.testng.annotations.Test;
 
 import static org.testng.Assert.assertEquals;
@@ -46,4 +54,50 @@ public class PinotTableRestletResourceTest {
           .asText(), serverTenant + "_REALTIME");
     }
   }
+
+  @Test
+  public void testGetStreamMetadataList()
+      throws Exception {
+    StreamConfig streamConfig0 = Mockito.mock(StreamConfig.class);
+    StreamConfig streamConfig1 = Mockito.mock(StreamConfig.class);
+
+    Map<Integer, Integer> streamPartitionCountMap = Map.of(0, 4, 1, 8);
+    PinotLLCRealtimeSegmentManager realtimeSegmentManager = 
Mockito.mock(PinotLLCRealtimeSegmentManager.class);
+    
Mockito.when(realtimeSegmentManager.getPartitionCountMap(Mockito.anyList())).thenReturn(streamPartitionCountMap);
+    PinotHelixResourceManager pinotHelixResourceManager = 
Mockito.mock(PinotHelixResourceManager.class);
+    
Mockito.when(pinotHelixResourceManager.getRealtimeSegmentManager()).thenReturn(realtimeSegmentManager);
+    PinotTableRestletResource resource = new PinotTableRestletResource();
+    resource._pinotHelixResourceManager = pinotHelixResourceManager;
+
+    List<StreamMetadata> streamMetadataList = 
resource.getStreamMetadataList(List.of(streamConfig0, streamConfig1),
+        new WatermarkInductionResult(List.of(
+            new WatermarkInductionResult.Watermark(1, 3, 101L),
+            new WatermarkInductionResult.Watermark(0, 2, 100L),
+            new WatermarkInductionResult.Watermark(10000, 5, 200L))));
+
+    assertEquals(streamMetadataList.size(), 2);
+
+    // List is ordered by streamConfigIndex (0, 1)
+    StreamMetadata streamMetadata0 = streamMetadataList.get(0);
+    assertEquals(streamMetadata0.getStreamConfig(), streamConfig0);
+    assertEquals(streamMetadata0.getNumPartitions(), 4);
+    assertEquals(streamMetadata0.getPartitionGroupMetadataList().size(), 2);
+    
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(0).getPartitionGroupId(),
 1);
+    assertEquals(((LongMsgOffset) 
streamMetadata0.getPartitionGroupMetadataList().get(0).getStartOffset()).getOffset(),
+        101L);
+    
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(0).getSequenceNumber(),
 3);
+    
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(1).getPartitionGroupId(),
 0);
+    assertEquals(((LongMsgOffset) 
streamMetadata0.getPartitionGroupMetadataList().get(1).getStartOffset()).getOffset(),
+        100L);
+    
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(1).getSequenceNumber(),
 2);
+
+    StreamMetadata streamMetadata1 = streamMetadataList.get(1);
+    assertEquals(streamMetadata1.getStreamConfig(), streamConfig1);
+    assertEquals(streamMetadata1.getNumPartitions(), 8);
+    assertEquals(streamMetadata1.getPartitionGroupMetadataList().size(), 1);
+    
assertEquals(streamMetadata1.getPartitionGroupMetadataList().get(0).getPartitionGroupId(),
 10000);
+    assertEquals(((LongMsgOffset) 
streamMetadata1.getPartitionGroupMetadataList().get(0).getStartOffset()).getOffset(),
+        200L);
+    
assertEquals(streamMetadata1.getPartitionGroupMetadataList().get(0).getSequenceNumber(),
 5);
+  }
 }
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index f4f876e4a55..e6d673b4273 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -32,7 +32,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ThreadLocalRandom;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.Pair;
 import org.apache.helix.HelixAdmin;
 import 
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
 import 
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
@@ -82,6 +81,8 @@ import org.apache.pinot.spi.data.FieldSpec;
 import org.apache.pinot.spi.stream.LongMsgOffset;
 import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
 import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
 import org.apache.pinot.spi.utils.CommonConstants.Segment;
@@ -1690,17 +1691,16 @@ public class PinotHelixResourceManagerStatelessTest 
extends ControllerTest {
     waitForEVToDisappear(tableConfig.getTableName());
     addDummySchema(rawTableName);
 
-    List<Pair<PartitionGroupMetadata, Integer>> consumingMetadata = new 
ArrayList<>();
-    PartitionGroupMetadata metadata0 = mock(PartitionGroupMetadata.class);
-    when(metadata0.getPartitionGroupId()).thenReturn(0);
-    
when(metadata0.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
-    consumingMetadata.add(Pair.of(metadata0, 5));
-    PartitionGroupMetadata metadata1 = mock(PartitionGroupMetadata.class);
-    when(metadata1.getPartitionGroupId()).thenReturn(1);
-    
when(metadata1.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
-    consumingMetadata.add(Pair.of(metadata1, 10));
-
-    _helixResourceManager.addTable(tableConfig, consumingMetadata);
+    StreamConfig streamConfig = new StreamConfig(rawTableName + "_REALTIME",
+        
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap());
+    PartitionGroupMetadata metadata0 =
+        new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class), 5);
+    PartitionGroupMetadata metadata1 =
+        new PartitionGroupMetadata(1, mock(StreamPartitionMsgOffset.class), 
10);
+    List<StreamMetadata> streamMetadataList = Collections.singletonList(
+        new StreamMetadata(streamConfig, 2, Arrays.asList(metadata0, 
metadata1)));
+
+    _helixResourceManager.addTable(tableConfig, streamMetadataList);
 
     IdealState idealState = 
_helixResourceManager.getTableIdealState(realtimeTableName);
     assertNotNull(idealState);
diff --git 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index e43fba113e2..eecad3d22c4 100644
--- 
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++ 
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -91,6 +91,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
 import org.apache.pinot.spi.stream.StreamConfigProperties;
 import org.apache.pinot.spi.stream.StreamConsumerFactory;
 import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -213,6 +214,72 @@ public class PinotLLCRealtimeSegmentManagerTest {
     }
   }
 
+  @Test
+  public void testSetUpNewTableWithExplicitSequenceNumbers() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    segmentManager._numReplicas = 2;
+    segmentManager.makeTableConfig();
+    segmentManager._numInstances = 3;
+    segmentManager.makeConsumingInstancePartitions();
+
+    // Create StreamMetadata with explicit sequence numbers (simulating copy 
table)
+    IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+    List<StreamMetadata> streamMetadataList = Collections.singletonList(
+        new StreamMetadata(segmentManager._streamConfigs.get(0), 3,
+            Arrays.asList(
+                new PartitionGroupMetadata(0, PARTITION_OFFSET, 5),
+                new PartitionGroupMetadata(1, PARTITION_OFFSET, 10),
+                new PartitionGroupMetadata(2, PARTITION_OFFSET, 0))));
+    segmentManager.setUpNewTable(segmentManager._tableConfig, idealState, 
streamMetadataList);
+
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    assertEquals(instanceStatesMap.size(), 3);
+
+    // Verify segments are created with the explicit sequence numbers
+    for (String segmentName : instanceStatesMap.keySet()) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      int partitionGroupId = llcSegmentName.getPartitionGroupId();
+      int sequence = llcSegmentName.getSequenceNumber();
+      if (partitionGroupId == 0) {
+        assertEquals(sequence, 5);
+      } else if (partitionGroupId == 1) {
+        assertEquals(sequence, 10);
+      } else if (partitionGroupId == 2) {
+        assertEquals(sequence, 0);
+      } else {
+        fail("Unexpected partition group id: " + partitionGroupId);
+      }
+    }
+  }
+
+  @Test
+  public void testSetUpNewTableDefaultSequenceNumberResolvesToZero() {
+    FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
+    segmentManager._numReplicas = 2;
+    segmentManager.makeTableConfig();
+    segmentManager._numInstances = 3;
+    segmentManager.makeConsumingInstancePartitions();
+
+    // Create StreamMetadata with default sequence numbers (-1, from 2-arg 
constructor)
+    IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+    List<StreamMetadata> streamMetadataList = Collections.singletonList(
+        new StreamMetadata(segmentManager._streamConfigs.get(0), 2,
+            Arrays.asList(
+                new PartitionGroupMetadata(0, PARTITION_OFFSET),
+                new PartitionGroupMetadata(1, PARTITION_OFFSET))));
+    segmentManager.setUpNewTable(segmentManager._tableConfig, idealState, 
streamMetadataList);
+
+    Map<String, Map<String, String>> instanceStatesMap = 
idealState.getRecord().getMapFields();
+    assertEquals(instanceStatesMap.size(), 2);
+
+    // Verify all segments are created with sequence 0 (default resolved from 
-1)
+    for (String segmentName : instanceStatesMap.keySet()) {
+      LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+      assertEquals(llcSegmentName.getSequenceNumber(), 0,
+          "Default sequence number -1 should resolve to 0 for partition " + 
llcSegmentName.getPartitionGroupId());
+    }
+  }
+
   private void setUpNewTable(FakePinotLLCRealtimeSegmentManager 
segmentManager, int numReplicas, int numInstances,
       int numPartitions) {
     segmentManager._numReplicas = numReplicas;
@@ -297,12 +364,17 @@ public class PinotLLCRealtimeSegmentManagerTest {
       // Expected
     }
 
-    // committing segment's partitionGroupId no longer in the 
newPartitionGroupMetadataList
-    List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
+    // committing segment's partitionGroupId no longer in the 
newStreamMetadataList
+    List<StreamMetadata> streamMetadataListWithout0 =
+        segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
             mock(IdealState.class));
-    partitionGroupMetadataListWithout0.remove(0);
-    segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout0;
+    // Remove partition 0 from the first stream's metadata
+    StreamMetadata originalSm = streamMetadataListWithout0.get(0);
+    List<PartitionGroupMetadata> filteredList = new 
ArrayList<>(originalSm.getPartitionGroupMetadataList());
+    filteredList.remove(0);
+    segmentManager._streamMetadataList = Collections.singletonList(
+        new StreamMetadata(originalSm.getStreamConfig(),
+            originalSm.getNumPartitions(), filteredList));
 
     // Commit a segment for partition 0 - No new entries created for partition 
which reached end of life
     committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2, 
CURRENT_TIME_MS).getSegmentName();
@@ -442,7 +514,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
         "pauseConsumption should include consuming segments from the updated 
ideal state");
   }
 
-
   @Test
   public void testCommitSegmentWithOffsetAutoResetOnOffset()
       throws Exception {
@@ -852,11 +923,16 @@ public class PinotLLCRealtimeSegmentManagerTest {
      * End of shard cases
      */
     // 1 reached end of shard.
-    List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-        
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
+    List<StreamMetadata> streamMetadataListWithout1 =
+        segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs, 
Collections.emptyList(),
             mock(IdealState.class));
-    partitionGroupMetadataListWithout1.remove(1);
-    segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout1;
+    // Remove partition 1 from the first stream's metadata
+    StreamMetadata origSm = streamMetadataListWithout1.get(0);
+    List<PartitionGroupMetadata> filteredPgList = new 
ArrayList<>(origSm.getPartitionGroupMetadataList());
+    filteredPgList.remove(1);
+    segmentManager._streamMetadataList = Collections.singletonList(
+        new StreamMetadata(origSm.getStreamConfig(),
+            origSm.getNumPartitions(), filteredPgList));
     // noop
     testRepairs(segmentManager, Collections.emptyList());
 
@@ -871,7 +947,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
     testRepairs(segmentManager, Lists.newArrayList(1));
 
     // make the last ONLINE segment of the shard as CONSUMING (failed between 
step1 and 3)
-    segmentManager._partitionGroupMetadataList = 
partitionGroupMetadataListWithout1;
     consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 1, 1, 
CURRENT_TIME_MS).getSegmentName();
     turnNewConsumingSegmentConsuming(instanceStatesMap, consumingSegment);
 
@@ -1282,9 +1357,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
     FakePinotLLCRealtimeSegmentManager segmentManager =
         spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
     setUpNewTable(segmentManager, 2, 5, 4);
-    segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
-        .mapToObj(partition -> new PartitionGroupMetadata(partition, 
PARTITION_OFFSET))
-        .collect(Collectors.toList());
+    segmentManager._streamMetadataList = Collections.singletonList(
+        new StreamMetadata(segmentManager._streamConfigs.get(0), 4,
+            IntStream.range(0, 4)
+                .mapToObj(partition -> new PartitionGroupMetadata(partition, 
PARTITION_OFFSET))
+                .collect(Collectors.toList())));
 
     String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
     CommittingSegmentDescriptor committingSegmentDescriptor = new 
CommittingSegmentDescriptor(committingSegment,
@@ -1300,9 +1377,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
   public void 
testCommitSegmentMetadataSkipsCreatingNewMetadataWhenTopicPausedIfPartitionIdsFallbackNeeded()
 {
     FakePinotLLCRealtimeSegmentManager segmentManager = spy(new 
FakePinotLLCRealtimeSegmentManager());
     setUpNewTable(segmentManager, 2, 5, 4);
-    segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
-        .mapToObj(partition -> new PartitionGroupMetadata(partition, 
PARTITION_OFFSET))
-        .collect(Collectors.toList());
+    segmentManager._streamMetadataList = Collections.singletonList(
+        new StreamMetadata(segmentManager._streamConfigs.get(0), 4,
+            IntStream.range(0, 4)
+                .mapToObj(partition -> new PartitionGroupMetadata(partition, 
PARTITION_OFFSET))
+                .collect(Collectors.toList())));
 
     PauseState pauseState =
         new PauseState(false, PauseState.ReasonCode.ADMINISTRATIVE, 
"pause-topic-for-test",
@@ -1321,7 +1400,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     
assertFalse(segmentManager._segmentZKMetadataMap.containsKey(expectedNewConsumingSegment));
     
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(expectedNewConsumingSegment));
     ZkHelixPropertyStore<ZNRecord> propertyStore =
-        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+        segmentManager._mockResourceManager.getPropertyStore();
     verify(propertyStore, never()).remove(anyString(), 
eq(AccessOption.PERSISTENT));
   }
 
@@ -1330,7 +1409,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
     setUpNewTable(segmentManager, 2, 5, 4);
     ZkHelixPropertyStore<ZNRecord> propertyStore =
-        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+        segmentManager._mockResourceManager.getPropertyStore();
     when(propertyStore.remove(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
 
     PauseState pauseState = new PauseState(true, 
PauseState.ReasonCode.ADMINISTRATIVE, "pause-for-test",
@@ -1358,7 +1437,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
     setUpNewTable(segmentManager, 2, 5, 4);
     ZkHelixPropertyStore<ZNRecord> propertyStore =
-        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+        segmentManager._mockResourceManager.getPropertyStore();
     when(propertyStore.remove(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
 
     PauseState pauseState =
@@ -1385,7 +1464,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     FakePinotLLCRealtimeSegmentManager segmentManager = new 
FakePinotLLCRealtimeSegmentManager();
     setUpNewTable(segmentManager, 2, 5, 4);
     ZkHelixPropertyStore<ZNRecord> propertyStore =
-        (ZkHelixPropertyStore<ZNRecord>) 
segmentManager._mockResourceManager.getPropertyStore();
+        segmentManager._mockResourceManager.getPropertyStore();
     when(propertyStore.remove(anyString(), 
eq(AccessOption.PERSISTENT))).thenReturn(true);
 
     String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0, 
CURRENT_TIME_MS).getSegmentName();
@@ -1855,11 +1934,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
             new PartitionGroupConsumptionStatus(1, 12, new LongMsgOffset(123), 
new LongMsgOffset(345), "ONLINE"));
     doReturn(partitionGroupConsumptionStatusList).when(segmentManagerSpy)
         .getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
-    List<PartitionGroupMetadata> partitionGroupMetadataList =
-        List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
-            new PartitionGroupMetadata(1, new LongMsgOffset(345)));
-    doReturn(partitionGroupMetadataList).when(segmentManagerSpy)
-        .getNewPartitionGroupMetadataList(streamConfigs, 
partitionGroupConsumptionStatusList, idealState);
+    List<StreamMetadata> streamMetadataList =
+        List.of(new StreamMetadata(streamConfigs.get(0), 2,
+            List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
+                new PartitionGroupMetadata(1, new LongMsgOffset(345)))));
+    doReturn(streamMetadataList).when(segmentManagerSpy)
+        .getNewStreamMetadataList(streamConfigs, 
partitionGroupConsumptionStatusList, idealState);
     partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs, 
idealState);
     Assert.assertEquals(partitionIds.size(), 2);
   }
@@ -2250,7 +2330,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
     Map<String, Integer> _segmentZKMetadataVersionMap = new HashMap<>();
     IdealState _idealState;
     int _numPartitions;
-    List<PartitionGroupMetadata> _partitionGroupMetadataList = null;
+    List<StreamMetadata> _streamMetadataList = null;
     boolean _exceededMaxSegmentCompletionTime = false;
     FileUploadDownloadClient _mockedFileUploadDownloadClient;
     PinotHelixResourceManager _mockResourceManager;
@@ -2330,7 +2410,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     public void ensureAllPartitionsConsuming() {
       ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState,
-          getNewPartitionGroupMetadataList(_streamConfigs, 
Collections.emptyList(), mock(IdealState.class)), null);
+          getNewStreamMetadataList(_streamConfigs, Collections.emptyList(), 
mock(IdealState.class)), null);
     }
 
     @Override
@@ -2407,28 +2487,31 @@ public class PinotLLCRealtimeSegmentManagerTest {
 
     @Override
     Set<Integer> getPartitionIds(StreamConfig streamConfig) {
-      if (_partitionGroupMetadataList != null) {
+      if (_streamMetadataList != null) {
         throw new UnsupportedOperationException();
       }
       return IntStream.range(0, 
_numPartitions).boxed().collect(Collectors.toSet());
     }
 
     @Override
-    List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+    List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig> 
streamConfigs,
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
-      if (_partitionGroupMetadataList != null) {
-        return _partitionGroupMetadataList;
+      if (_streamMetadataList != null) {
+        return _streamMetadataList;
       } else {
-        return IntStream.range(0, _numPartitions).mapToObj(i -> new 
PartitionGroupMetadata(i, PARTITION_OFFSET))
-            .collect(Collectors.toList());
+        List<PartitionGroupMetadata> partitionGroupMetadataList =
+            IntStream.range(0, _numPartitions).mapToObj(i -> new 
PartitionGroupMetadata(i, PARTITION_OFFSET))
+                .collect(Collectors.toList());
+        return Collections.singletonList(
+            new StreamMetadata(streamConfigs.get(0), _numPartitions, 
partitionGroupMetadataList));
       }
     }
 
     @Override
-    List<PartitionGroupMetadata> 
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+    List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig> 
streamConfigs,
         List<PartitionGroupConsumptionStatus> 
currentPartitionGroupConsumptionStatusList, IdealState idealState,
         boolean forceGetOffsetFromStream) {
-      return getNewPartitionGroupMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
+      return getNewStreamMetadataList(streamConfigs, 
currentPartitionGroupConsumptionStatusList, idealState);
     }
 
     @Override
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index d5127b6d3be..3b7103c8284 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -22,18 +22,27 @@ package org.apache.pinot.spi.stream;
  * A PartitionGroup is a group of partitions/shards that the same consumer 
should consume from.
  * This class is a container for the metadata regarding a partition group, 
that is needed by a consumer to start
  * consumption.
- * It consists of
+ * It consists of:
  * 1. A unique partition group id for this partition group
  * 2. The start offset to begin consumption for this partition group
+ * 3. The sequence number for the consuming segment (used when creating 
segments with designated offsets/sequences)
  */
 public class PartitionGroupMetadata {
 
+  private static final int DEFAULT_SEQUENCE_NUMBER = -1;
+
   private final int _partitionGroupId;
   private final StreamPartitionMsgOffset _startOffset;
+  private final int _sequenceNumber;
 
   public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset 
startOffset) {
+    this(partitionGroupId, startOffset, DEFAULT_SEQUENCE_NUMBER);
+  }
+
+  public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset 
startOffset, int sequenceNumber) {
     _partitionGroupId = partitionGroupId;
     _startOffset = startOffset;
+    _sequenceNumber = sequenceNumber;
   }
 
   public int getPartitionGroupId() {
@@ -43,4 +52,8 @@ public class PartitionGroupMetadata {
   public StreamPartitionMsgOffset getStartOffset() {
     return _startOffset;
   }
+
+  public int getSequenceNumber() {
+    return _sequenceNumber;
+  }
 }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 698ad472e1a..2e0443228d6 100644
--- 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++ 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -19,6 +19,7 @@
 package org.apache.pinot.spi.stream;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.stream.Collectors;
@@ -28,16 +29,17 @@ import org.slf4j.LoggerFactory;
 
 
 /**
- * Fetches the list of {@link PartitionGroupMetadata} for all partition groups 
of the streams,
+ * Fetches the {@link StreamMetadata} for all streams of a table,
  * using the {@link StreamMetadataProvider}
  */
 public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
+  private static final int METADATA_FETCH_TIMEOUT_MS = 15000;
 
   private final List<StreamConfig> _streamConfigs;
   private final List<PartitionGroupConsumptionStatus> 
_partitionGroupConsumptionStatusList;
   private final boolean _forceGetOffsetFromStream;
-  private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList = 
new ArrayList<>();
+  private final List<StreamMetadata> _streamMetadataList = new ArrayList<>();
   private final List<Integer> _pausedTopicIndices;
 
   private Exception _exception;
@@ -51,8 +53,18 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
     _pausedTopicIndices = pausedTopicIndices;
   }
 
+  public List<StreamMetadata> getStreamMetadataList() {
+    return Collections.unmodifiableList(_streamMetadataList);
+  }
+
+  /**
+   * @deprecated after 1.5.0 release. Use {@link #getStreamMetadataList()} 
instead.
+   */
+  @Deprecated
   public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
-    return _newPartitionGroupMetadataList;
+    return _streamMetadataList.stream()
+        .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
+        .collect(Collectors.toList());
   }
 
   public Exception getException() {
@@ -60,14 +72,15 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
   }
 
   /**
-   * Callable to fetch the {@link PartitionGroupMetadata} list, from the 
stream.
+   * Callable to fetch the {@link StreamMetadata} list from the streams.
    * The stream requires the list of {@link PartitionGroupConsumptionStatus} 
to compute the new
    * {@link PartitionGroupMetadata}
    */
   @Override
   public Boolean call()
       throws Exception {
-    _newPartitionGroupMetadataList.clear();
+    _streamMetadataList.clear();
+    _exception = null;
     return _streamConfigs.size() == 1 ? fetchSingleStream() : 
fetchMultipleStreams();
   }
 
@@ -81,18 +94,18 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
     StreamConsumerFactory streamConsumerFactory = 
StreamConsumerFactoryProvider.create(streamConfig);
     try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
         StreamConsumerFactory.getUniqueClientId(clientId))) {
-      
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
 streamConfig,
-          _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000, 
_forceGetOffsetFromStream));
-      if (_exception != null) {
-        // We had at least one failure, but succeeded now. Log an info
-        LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", topicName);
-      }
+      List<PartitionGroupMetadata> partitionGroupMetadataList =
+          streamMetadataProvider.computePartitionGroupMetadata(clientId, 
streamConfig,
+              _partitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/METADATA_FETCH_TIMEOUT_MS,
+              _forceGetOffsetFromStream);
+      _streamMetadataList.add(
+          new StreamMetadata(streamConfig, partitionGroupMetadataList.size(), 
partitionGroupMetadataList));
     } catch (TransientConsumerException e) {
-      LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", topicName, e);
+      LOGGER.warn("Transient Exception: Could not get StreamMetadata for topic 
{}", topicName, e);
       _exception = e;
       return Boolean.FALSE;
     } catch (Exception e) {
-      LOGGER.warn("Could not get partition count for topic {}", topicName, e);
+      LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
       _exception = e;
       throw e;
     }
@@ -104,7 +117,7 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
     int numStreams = _streamConfigs.size();
     for (int i = 0; i < numStreams; i++) {
       if (_pausedTopicIndices.contains(i)) {
-        LOGGER.info("Skipping fetching PartitionGroupMetadata for paused 
topic: {}",
+        LOGGER.info("Skipping fetching StreamMetadata for paused topic: {}",
             _streamConfigs.get(i).getTopicName());
         continue;
       }
@@ -122,25 +135,24 @@ public class PartitionGroupMetadataFetcher implements 
Callable<Boolean> {
               .collect(Collectors.toList());
       try (StreamMetadataProvider streamMetadataProvider = 
streamConsumerFactory.createStreamMetadataProvider(
           StreamConsumerFactory.getUniqueClientId(clientId))) {
-        _newPartitionGroupMetadataList.addAll(
+        List<PartitionGroupMetadata> partitionGroupMetadataList =
             streamMetadataProvider.computePartitionGroupMetadata(clientId,
-                    streamConfig, topicPartitionGroupConsumptionStatusList, 
/*maxWaitTimeMs=*/15000,
+                    streamConfig, topicPartitionGroupConsumptionStatusList,
+                    /*maxWaitTimeMs=*/METADATA_FETCH_TIMEOUT_MS,
                     _forceGetOffsetFromStream)
                 .stream()
                 .map(metadata -> new PartitionGroupMetadata(
                     
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
-                        index), metadata.getStartOffset()))
-                .collect(Collectors.toList()));
-        if (_exception != null) {
-          // We had at least one failure, but succeeded now. Log an info
-          LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic 
{}", topicName);
-        }
+                        index), metadata.getStartOffset(), 
metadata.getSequenceNumber()))
+                .collect(Collectors.toList());
+        _streamMetadataList.add(
+            new StreamMetadata(streamConfig, 
partitionGroupMetadataList.size(), partitionGroupMetadataList));
       } catch (TransientConsumerException e) {
-        LOGGER.warn("Transient Exception: Could not get partition count for 
topic {}", topicName, e);
+        LOGGER.warn("Transient Exception: Could not get StreamMetadata for 
topic {}", topicName, e);
         _exception = e;
         return Boolean.FALSE;
       } catch (Exception e) {
-        LOGGER.warn("Could not get partition count for topic {}", topicName, 
e);
+        LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
         _exception = e;
         throw e;
       }
diff --git 
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java 
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java
new file mode 100644
index 00000000000..a59e056b02b
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+/**
+ * Groups partition metadata for a single stream/topic.
+ *
+ * <p>This replaces the flat {@code List<PartitionGroupMetadata>} pattern 
where partitions from all streams were mixed
+ * together and required partition ID padding to identify stream membership.
+ *
+ * <p>The {@link PartitionGroupMetadata} items within this container use 
Pinot-encoded partition IDs
+ * (i.e., {@code streamIndex * 10000 + streamPartitionId}) to maintain 
backward compatibility with segment names
+ * stored in ZooKeeper.
+ */
+public class StreamMetadata {
+
+  private final StreamConfig _streamConfig;
+  private final int _numPartitions;
+  private final List<PartitionGroupMetadata> _partitionGroupMetadataList;
+
+  public StreamMetadata(StreamConfig streamConfig, int numPartitions,
+      List<PartitionGroupMetadata> partitionGroupMetadataList) {
+    _streamConfig = streamConfig;
+    _numPartitions = numPartitions;
+    _partitionGroupMetadataList = List.copyOf(partitionGroupMetadataList);
+  }
+
+  public StreamConfig getStreamConfig() {
+    return _streamConfig;
+  }
+
+  public String getTopicName() {
+    return _streamConfig.getTopicName();
+  }
+
+  public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
+    return _partitionGroupMetadataList;
+  }
+
+  /**
+   * Returns the total number of partitions for this stream. This may be 
greater than the size of
+   * {@link #getPartitionGroupMetadataList()} when only a subset of partitions 
is assigned.
+   */
+  public int getNumPartitions() {
+    return _numPartitions;
+  }
+}
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
index 9fa65254b63..1c229b602da 100644
--- 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
@@ -27,10 +27,7 @@ import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
@@ -70,7 +67,10 @@ public class PartitionGroupMetadataFetcherTest {
 
       // Verify
       Assert.assertTrue(result);
-      Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 1);
+      List<StreamMetadata> streamMetadataList = 
fetcher.getStreamMetadataList();
+      Assert.assertEquals(streamMetadataList.size(), 1);
+      Assert.assertEquals(streamMetadataList.get(0).getNumPartitions(), 1);
+      
Assert.assertEquals(streamMetadataList.get(0).getPartitionGroupMetadataList().size(),
 1);
       Assert.assertNull(fetcher.getException());
     }
   }
@@ -85,6 +85,7 @@ public class PartitionGroupMetadataFetcherTest {
     List<PartitionGroupConsumptionStatus> statusList = Collections.emptyList();
 
     StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
     when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
         any(List.class), anyInt(), anyBoolean()))
         .thenThrow(new TransientConsumerException(new 
RuntimeException("Transient error")));
@@ -143,12 +144,18 @@ public class PartitionGroupMetadataFetcherTest {
 
       // Verify
       Assert.assertTrue(result);
-      Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+      List<StreamMetadata> streamMetadataList = 
fetcher.getStreamMetadataList();
+      Assert.assertEquals(streamMetadataList.size(), 2);
       Assert.assertNull(fetcher.getException());
 
+      Assert.assertEquals(streamMetadataList.get(0).getNumPartitions(), 2);
+      
Assert.assertEquals(streamMetadataList.get(0).getPartitionGroupMetadataList().size(),
 2);
+      Assert.assertEquals(streamMetadataList.get(1).getNumPartitions(), 2);
+      
Assert.assertEquals(streamMetadataList.get(1).getPartitionGroupMetadataList().size(),
 2);
+
       // Verify the correct partition group IDs: 0, 1, 10000, 10001
-      List<PartitionGroupMetadata> resultMetadata = 
fetcher.getPartitionGroupMetadataList();
-      List<Integer> partitionIds = resultMetadata.stream()
+      List<Integer> partitionIds = streamMetadataList.stream()
+          .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
           .map(PartitionGroupMetadata::getPartitionGroupId)
           .sorted()
           .collect(Collectors.toList());
@@ -174,6 +181,7 @@ public class PartitionGroupMetadataFetcherTest {
     PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1, 
mock(StreamPartitionMsgOffset.class));
 
     StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(3);
     when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
         any(List.class), anyInt(), anyBoolean()))
         .thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2));
@@ -186,19 +194,20 @@ public class PartitionGroupMetadataFetcherTest {
       mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
 
       PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
-          streamConfigs, statusList, Arrays.asList(1), false);
+          streamConfigs, statusList, List.of(1), false);
 
       // Execute
       Boolean result = fetcher.call();
 
-      // Verify
+      // Verify - 2 streams active (topic1 at index 0, topic3 at index 2; 
topic2 at index 1 is paused)
       Assert.assertTrue(result);
-      Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+      List<StreamMetadata> streamMetadataList = 
fetcher.getStreamMetadataList();
+      Assert.assertEquals(streamMetadataList.size(), 2);
       Assert.assertNull(fetcher.getException());
 
       // Verify the correct partition group IDs
-      List<PartitionGroupMetadata> resultMetadata = 
fetcher.getPartitionGroupMetadataList();
-      List<Integer> partitionIds = resultMetadata.stream()
+      List<Integer> partitionIds = streamMetadataList.stream()
+          .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
           .map(PartitionGroupMetadata::getPartitionGroupId)
           .sorted()
           .collect(Collectors.toList());
@@ -207,10 +216,259 @@ public class PartitionGroupMetadataFetcherTest {
     }
   }
 
+  @Test
+  public void testDeprecatedGetPartitionGroupMetadataListFlatMaps()
+      throws Exception {
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2);
+
+    PartitionGroupConsumptionStatus status1 = new 
PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS");
+    List<PartitionGroupConsumptionStatus> statusList = 
Collections.singletonList(status1);
+
+    StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+    PartitionGroupMetadata m1 = new PartitionGroupMetadata(0, offset);
+    PartitionGroupMetadata m2 = new PartitionGroupMetadata(1, offset);
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(2);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean())).thenReturn(Arrays.asList(m1, 
m2));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, statusList, Collections.emptyList(), false);
+      fetcher.call();
+
+      // Deprecated method should flat-map across all streams
+      List<PartitionGroupMetadata> flatList = 
fetcher.getPartitionGroupMetadataList();
+      Assert.assertEquals(flatList.size(), 4); // 2 per stream * 2 streams
+    }
+  }
+
+  @Test
+  public void testExceptionResetOnRetry()
+      throws Exception {
+    StreamConfig streamConfig = createMockStreamConfig("test-topic", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+    StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+    PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, offset);
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
+    // First call: transient failure; second call: success
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean()))
+        .thenThrow(new TransientConsumerException(new 
RuntimeException("Transient")))
+        .thenReturn(Collections.singletonList(metadata));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, Collections.emptyList(), Collections.emptyList(), 
false);
+
+      // First call fails
+      Boolean result1 = fetcher.call();
+      Assert.assertFalse(result1);
+      Assert.assertNotNull(fetcher.getException());
+
+      // Second call succeeds - exception should be reset
+      Boolean result2 = fetcher.call();
+      Assert.assertTrue(result2);
+      Assert.assertNull(fetcher.getException());
+      Assert.assertEquals(fetcher.getStreamMetadataList().size(), 1);
+    }
+  }
+
+  @Test
+  public void testSequenceNumberPreservedInMultiStreamRemap()
+      throws Exception {
+    StreamConfig streamConfig1 = createMockStreamConfig("topic1", 
"test-table", false);
+    StreamConfig streamConfig2 = createMockStreamConfig("topic2", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1, 
streamConfig2);
+
+    List<PartitionGroupConsumptionStatus> statusList = Collections.emptyList();
+
+    StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+    PartitionGroupMetadata m1 = new PartitionGroupMetadata(0, offset, 7);
+    PartitionGroupMetadata m2 = new PartitionGroupMetadata(1, offset, 3);
+
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(2);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), anyBoolean())).thenReturn(Arrays.asList(m1, 
m2));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, statusList, Collections.emptyList(), false);
+      fetcher.call();
+
+      List<StreamMetadata> streamMetadataList = 
fetcher.getStreamMetadataList();
+      Assert.assertEquals(streamMetadataList.size(), 2);
+
+      // Second stream's partitions should have remapped IDs but preserved 
sequence numbers
+      List<PartitionGroupMetadata> stream1Partitions = 
streamMetadataList.get(1).getPartitionGroupMetadataList();
+      Assert.assertEquals(stream1Partitions.get(0).getPartitionGroupId(), 
10000);
+      Assert.assertEquals(stream1Partitions.get(0).getSequenceNumber(), 7);
+      Assert.assertEquals(stream1Partitions.get(1).getPartitionGroupId(), 
10001);
+      Assert.assertEquals(stream1Partitions.get(1).getSequenceNumber(), 3);
+    }
+  }
+
+  @Test
+  public void testGetStreamMetadataListReturnsUnmodifiable()
+      throws Exception {
+    StreamConfig streamConfig = createMockStreamConfig("test-topic", 
"test-table", false);
+    List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+    PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+    StreamMetadataProvider metadataProvider = 
mock(StreamMetadataProvider.class);
+    when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
+    when(metadataProvider.computePartitionGroupMetadata(anyString(), 
any(StreamConfig.class),
+        any(List.class), anyInt(), 
anyBoolean())).thenReturn(Collections.singletonList(metadata));
+
+    StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+    
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+    try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider = 
Mockito.mockStatic(
+        StreamConsumerFactoryProvider.class)) {
+      mockedProvider.when(() -> 
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+      PartitionGroupMetadataFetcher fetcher = new 
PartitionGroupMetadataFetcher(
+          streamConfigs, Collections.emptyList(), Collections.emptyList(), 
false);
+      fetcher.call();
+
+      try {
+        fetcher.getStreamMetadataList().add(
+            new StreamMetadata(streamConfig, 1, Collections.emptyList()));
+        Assert.fail("Expected UnsupportedOperationException");
+      } catch (UnsupportedOperationException e) {
+        // expected
+      }
+    }
+  }
+
   private StreamConfig createMockStreamConfig(String topicName, String 
tableName, boolean isEphemeral) {
     StreamConfig streamConfig = mock(StreamConfig.class);
     when(streamConfig.getTopicName()).thenReturn(topicName);
     when(streamConfig.getTableNameWithType()).thenReturn(tableName);
     return streamConfig;
   }
+
+  private static final class DefaultComputeOnlyMetadataProvider implements 
StreamMetadataProvider {
+    private int _fetchPartitionCountCalls;
+
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+      _fetchPartitionCountCalls++;
+      return 1;
+    }
+
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public boolean supportsOffsetLag() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    int getFetchPartitionCountCalls() {
+      return _fetchPartitionCountCalls;
+    }
+  }
+
+  private static final class OverriddenComputeMetadataProvider implements 
StreamMetadataProvider {
+    private int _fetchPartitionCountCalls;
+    private final StreamPartitionMsgOffset _offset = 
mock(StreamPartitionMsgOffset.class);
+
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+      _fetchPartitionCountCalls++;
+      return 3;
+    }
+
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+        List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatuses, int timeoutMillis,
+        boolean forceGetOffsetFromStream) {
+      return Collections.singletonList(new PartitionGroupMetadata(0, _offset));
+    }
+
+    @Override
+    public boolean supportsOffsetLag() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    int getFetchPartitionCountCalls() {
+      return _fetchPartitionCountCalls;
+    }
+  }
+
+  private static final class OverriddenFourArgComputeMetadataProvider 
implements StreamMetadataProvider {
+    private int _fetchPartitionCountCalls;
+    private final StreamPartitionMsgOffset _offset = 
mock(StreamPartitionMsgOffset.class);
+
+    @Override
+    public int fetchPartitionCount(long timeoutMillis) {
+      _fetchPartitionCountCalls++;
+      return 3;
+    }
+
+    @Override
+    public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria 
offsetCriteria, long timeoutMillis) {
+      throw new UnsupportedOperationException("Should not be called");
+    }
+
+    @Override
+    public List<PartitionGroupMetadata> computePartitionGroupMetadata(String 
clientId, StreamConfig streamConfig,
+        List<PartitionGroupConsumptionStatus> 
partitionGroupConsumptionStatuses, int timeoutMillis) {
+      return Collections.singletonList(new PartitionGroupMetadata(0, _offset));
+    }
+
+    @Override
+    public boolean supportsOffsetLag() {
+      return false;
+    }
+
+    @Override
+    public void close() {
+    }
+
+    int getFetchPartitionCountCalls() {
+      return _fetchPartitionCountCalls;
+    }
+  }
 }
diff --git 
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataTest.java 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataTest.java
new file mode 100644
index 00000000000..4afb14aae25
--- /dev/null
+++ 
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+
+
+public class StreamMetadataTest {
+
+  @Test
+  public void testGetters() {
+    StreamConfig streamConfig = mock(StreamConfig.class);
+    when(streamConfig.getTopicName()).thenReturn("test-topic");
+
+    PartitionGroupMetadata pg0 = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+    PartitionGroupMetadata pg1 = new PartitionGroupMetadata(1, 
mock(StreamPartitionMsgOffset.class), 5);
+    List<PartitionGroupMetadata> pgList = Arrays.asList(pg0, pg1);
+
+    StreamMetadata sm = new StreamMetadata(streamConfig, 10, pgList);
+
+    assertSame(sm.getStreamConfig(), streamConfig);
+    assertEquals(sm.getTopicName(), "test-topic");
+    assertEquals(sm.getNumPartitions(), 10);
+    assertEquals(sm.getPartitionGroupMetadataList().size(), 2);
+    assertSame(sm.getPartitionGroupMetadataList().get(0), pg0);
+    assertSame(sm.getPartitionGroupMetadataList().get(1), pg1);
+  }
+
+  @Test
+  public void testNumPartitionsCanExceedListSize() {
+    StreamConfig streamConfig = mock(StreamConfig.class);
+    PartitionGroupMetadata pg = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+
+    StreamMetadata sm = new StreamMetadata(streamConfig, 100, 
Collections.singletonList(pg));
+
+    assertEquals(sm.getNumPartitions(), 100);
+    assertEquals(sm.getPartitionGroupMetadataList().size(), 1);
+  }
+
+  @Test
+  public void testDefensiveCopy() {
+    StreamConfig streamConfig = mock(StreamConfig.class);
+    PartitionGroupMetadata pg0 = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+    List<PartitionGroupMetadata> mutableList = new ArrayList<>();
+    mutableList.add(pg0);
+
+    StreamMetadata sm = new StreamMetadata(streamConfig, 1, mutableList);
+    assertEquals(sm.getPartitionGroupMetadataList().size(), 1);
+
+    // Mutating the original list should not affect StreamMetadata
+    mutableList.add(new PartitionGroupMetadata(1, 
mock(StreamPartitionMsgOffset.class)));
+    assertEquals(sm.getPartitionGroupMetadataList().size(), 1);
+  }
+
+  @Test(expectedExceptions = UnsupportedOperationException.class)
+  public void testPartitionGroupMetadataListIsUnmodifiable() {
+    StreamConfig streamConfig = mock(StreamConfig.class);
+    PartitionGroupMetadata pg = new PartitionGroupMetadata(0, 
mock(StreamPartitionMsgOffset.class));
+
+    StreamMetadata sm = new StreamMetadata(streamConfig, 1, 
Collections.singletonList(pg));
+    sm.getPartitionGroupMetadataList().add(new PartitionGroupMetadata(1, 
mock(StreamPartitionMsgOffset.class)));
+  }
+
+  @Test
+  public void testEmptyPartitionGroupMetadataList() {
+    StreamConfig streamConfig = mock(StreamConfig.class);
+
+    StreamMetadata sm = new StreamMetadata(streamConfig, 5, 
Collections.emptyList());
+
+    assertEquals(sm.getNumPartitions(), 5);
+    assertEquals(sm.getPartitionGroupMetadataList().size(), 0);
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to