This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 7eec9a78717 [Refactor] Introduce StreamMetadata to group partition
metadata by stream (#17811)
7eec9a78717 is described below
commit 7eec9a78717315235d12c1082f758574e7979e2e
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Mar 6 18:37:39 2026 -0800
[Refactor] Introduce StreamMetadata to group partition metadata by stream
(#17811)
* Introduce StreamMetadata to group partition metadata by stream
Replace the flat List<PartitionGroupMetadata> pattern with
List<StreamMetadata>, where each StreamMetadata groups partition
metadata for a single stream along with its StreamConfig and index.
This makes stream membership explicit and eliminates the need for
callers to decode partition IDs to determine stream ownership.
Co-Authored-By: Claude Opus 4.6 <[email protected]>
* [Refactor] Introduce StreamMetadata to group partition metadata by stream
Introduce StreamMetadata to replace the flat List<PartitionGroupMetadata>
pattern where partitions from all streams were mixed together and required
partition ID padding (streamIndex * 10000 + streamPartitionId) to identify
stream membership.
Key changes:
- New StreamMetadata class grouping PartitionGroupMetadata per stream with
StreamConfig and numPartitions (total partition count from broker)
- PartitionGroupMetadataFetcher now produces List<StreamMetadata> with a
deprecated backward-compat getPartitionGroupMetadataList() method
- setUpNewTable and addTable APIs take List<StreamMetadata> directly
- PartitionGroupMetadata gains a sequenceNumber field (default -1 = unset)
to replace the Pair<PartitionGroupMetadata, Integer> pattern
- fetchPartitionCount() called after computePartitionGroupMetadata() to
avoid duplicate metadata RPCs for Kafka-like implementations
- Copy table flow constructs StreamMetadata from watermarks with real
partition counts via getPartitionCountMap()
- Callers using sequence number validate >= 0 before use
Co-Authored-By: Claude Opus 4.6 <[email protected]>
---------
Co-authored-by: Claude Opus 4.6 <[email protected]>
---
.../api/resources/PinotTableRestletResource.java | 49 +++-
.../helix/core/PinotHelixResourceManager.java | 21 +-
.../helix/core/PinotTableIdealStateBuilder.java | 13 +-
.../realtime/MissingConsumingSegmentFinder.java | 9 +-
.../realtime/PinotLLCRealtimeSegmentManager.java | 142 ++++++-----
.../resources/PinotTableRestletResourceTest.java | 54 ++++
.../PinotHelixResourceManagerStatelessTest.java | 24 +-
.../PinotLLCRealtimeSegmentManagerTest.java | 155 ++++++++---
.../pinot/spi/stream/PartitionGroupMetadata.java | 15 +-
.../spi/stream/PartitionGroupMetadataFetcher.java | 60 +++--
.../apache/pinot/spi/stream/StreamMetadata.java | 66 +++++
.../stream/PartitionGroupMetadataFetcherTest.java | 284 ++++++++++++++++++++-
.../pinot/spi/stream/StreamMetadataTest.java | 98 +++++++
13 files changed, 818 insertions(+), 172 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
index 498f0349beb..578a295efcd 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotTableRestletResource.java
@@ -129,8 +129,11 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.IngestionConfigUtils;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.ControllerRequestURLBuilder;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
@@ -351,17 +354,13 @@ public class PinotTableRestletResource {
return new CopyTableResponse("success", "Dry run", schema,
realtimeTableConfig, watermarkInductionResult);
}
- List<Pair<PartitionGroupMetadata, Integer>> partitionGroupInfos =
watermarkInductionResult.getWatermarks()
- .stream()
- .map(watermark -> Pair.of(
- new PartitionGroupMetadata(watermark.getPartitionGroupId(), new
LongMsgOffset(watermark.getOffset())),
- watermark.getSequenceNumber()))
- .collect(Collectors.toList());
+ List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(realtimeTableConfig);
+ List<StreamMetadata> streamMetadataList =
getStreamMetadataList(streamConfigs, watermarkInductionResult);
_pinotHelixResourceManager.addSchema(schema, true, false);
LOGGER.info("[copyTable] Successfully added schema for table: {}",
tableName);
// Add the table with designated starting kafka offset and segment
sequence number to create consuming segments
- _pinotHelixResourceManager.addTable(realtimeTableConfig,
partitionGroupInfos);
+ _pinotHelixResourceManager.addTable(realtimeTableConfig,
streamMetadataList);
LOGGER.info("[copyTable] Successfully added table config: {} with
designated high watermark", tableName);
CopyTableResponse response = new CopyTableResponse("success", "Table
copied successfully", null, null, null);
if (hasOffline) {
@@ -381,6 +380,42 @@ public class PinotTableRestletResource {
}
}
+ @VisibleForTesting
+ List<StreamMetadata> getStreamMetadataList(List<StreamConfig> streamConfigs,
+ WatermarkInductionResult watermarkInductionResult)
+ throws Exception {
+ Map<Integer, Integer> streamPartitionCountMap =
+
_pinotHelixResourceManager.getRealtimeSegmentManager().getPartitionCountMap(streamConfigs);
+ Map<Integer, List<PartitionGroupMetadata>>
partitionGroupMetadataByStreamConfigIndex = new HashMap<>();
+ for (WatermarkInductionResult.Watermark watermark :
watermarkInductionResult.getWatermarks()) {
+ int streamConfigIndex =
+
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(watermark.getPartitionGroupId());
+ Preconditions.checkArgument(streamConfigIndex >= 0 && streamConfigIndex
< streamConfigs.size(),
+ "Invalid stream config index %s from watermark partition ID %s.
Expected index in range [0, %s)",
+ streamConfigIndex, watermark.getPartitionGroupId(),
streamConfigs.size());
+
partitionGroupMetadataByStreamConfigIndex.computeIfAbsent(streamConfigIndex,
ignored -> new ArrayList<>()).add(
+ new PartitionGroupMetadata(watermark.getPartitionGroupId(), new
LongMsgOffset(watermark.getOffset()),
+ watermark.getSequenceNumber()));
+ }
+
+ // Iterate in order by streamConfigIndex to ensure deterministic ordering
+ List<StreamMetadata> streamMetadataList = new
ArrayList<>(partitionGroupMetadataByStreamConfigIndex.size());
+ for (int streamConfigIndex = 0; streamConfigIndex < streamConfigs.size();
streamConfigIndex++) {
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
+ partitionGroupMetadataByStreamConfigIndex.get(streamConfigIndex);
+ if (partitionGroupMetadataList == null) {
+ // No watermarks for this stream config index, skip it
+ continue;
+ }
+ Integer partitionCount = streamPartitionCountMap.get(streamConfigIndex);
+ Preconditions.checkState(partitionCount != null,
+ "Cannot find partition count for stream config index: %s",
streamConfigIndex);
+ streamMetadataList.add(new
StreamMetadata(streamConfigs.get(streamConfigIndex),
+ partitionCount, partitionGroupMetadataList));
+ }
+ return streamMetadataList;
+ }
+
/**
* Helper method to tweak the realtime table config. This method is used to
set the broker and server tenants, and
* optionally replace the pool tags in the instance assignment config.
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
index 8ae187ab3a6..3bf66d81e65 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java
@@ -180,8 +180,8 @@ import org.apache.pinot.spi.data.LogicalTableConfig;
import org.apache.pinot.spi.data.PhysicalTableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import
org.apache.pinot.spi.utils.CommonConstants.Helix.StateModel.BrokerResourceStateModel;
@@ -1808,17 +1808,17 @@ public class PinotHelixResourceManager {
* designated offset and being assigned with a segment sequence number per
partition. Otherwise, you should
* directly call the {@link #addTable(TableConfig)} which will further call
this api with an empty list.
* @param tableConfig The config for the table to be created.
- * @param consumeMeta A list of pairs, where each pair contains the
partition group metadata and the initial sequence
- * number for a consuming segment. This is used to start
ingestion from a specific offset.
+ * @param streamMetadataList A list of {@link StreamMetadata}, each
containing partition group metadata with
+ * sequence numbers. This is used to start
ingestion from a specific offset.
* @throws InvalidTableConfigException if validations fail
* @throws TableAlreadyExistsException if the table already exists
*/
- public void addTable(TableConfig tableConfig,
List<Pair<PartitionGroupMetadata, Integer>> consumeMeta)
+ public void addTable(TableConfig tableConfig, List<StreamMetadata>
streamMetadataList)
throws IOException {
String tableNameWithType = tableConfig.getTableName();
LOGGER.info("Adding table {}: Start", tableNameWithType);
- if (consumeMeta != null && !consumeMeta.isEmpty()) {
- LOGGER.info("Adding table {} with {} partition group infos",
tableNameWithType, consumeMeta.size());
+ if (streamMetadataList != null && !streamMetadataList.isEmpty()) {
+ LOGGER.info("Adding table {} with {} stream metadata entries",
tableNameWithType, streamMetadataList.size());
}
if (getTableConfig(tableNameWithType) != null) {
@@ -1878,15 +1878,14 @@ public class PinotHelixResourceManager {
// Add ideal state
_helixAdmin.addResource(_helixClusterName, tableNameWithType,
idealState);
LOGGER.info("Adding table {}: Added ideal state for offline table",
tableNameWithType);
- } else if (consumeMeta == null || consumeMeta.isEmpty()) {
+ } else if (streamMetadataList == null || streamMetadataList.isEmpty()) {
// Add ideal state with the first CONSUMING segment
_pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState);
LOGGER.info("Adding table {}: Added ideal state with first consuming
segment", tableNameWithType);
} else {
- // Add ideal state with the first CONSUMING segment with designated
partition consuming metadata
- // Add ideal state with the first CONSUMING segment
- _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState,
consumeMeta);
- LOGGER.info("Adding table {}: Added consuming segments ideal state
given the designated consuming metadata",
+ // Add ideal state with consuming segments from designated stream
metadata
+ _pinotLLCRealtimeSegmentManager.setUpNewTable(tableConfig, idealState,
streamMetadataList);
+ LOGGER.info("Adding table {}: Added consuming segments ideal state
given the designated stream metadata",
tableNameWithType);
}
} catch (Exception e) {
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
index 6ec48830ca7..fafbabb7ad3 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotTableIdealStateBuilder.java
@@ -19,14 +19,15 @@
package org.apache.pinot.controller.helix.core;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.helix.model.IdealState;
import org.apache.helix.model.builder.CustomModeISBuilder;
import org.apache.pinot.common.metrics.ControllerMeter;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
-import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.PartitionGroupMetadataFetcher;
import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.utils.retry.RetryPolicies;
import org.apache.pinot.spi.utils.retry.RetryPolicy;
import org.slf4j.Logger;
@@ -54,7 +55,7 @@ public class PinotTableIdealStateBuilder {
}
/**
- * Fetches the list of {@link PartitionGroupMetadata} for the new partition
groups for the stream,
+ * Fetches the list of {@link StreamMetadata} for all streams of the table,
* with the help of the {@link PartitionGroupConsumptionStatus} of the
current partitionGroups.
* In particular, this method can also be used to fetch from multiple stream
topics.
*
@@ -90,19 +91,19 @@ public class PinotTableIdealStateBuilder {
* @param pausedTopicIndices List of inactive topic indices. Index is the
index of the topic in the streamConfigMaps.
* @param forceGetOffsetFromStream - details in
PinotLLCRealtimeSegmentManager.fetchPartitionGroupIdToSmallestOffset
*/
- public static List<PartitionGroupMetadata>
getPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ public static List<StreamMetadata> getStreamMetadataList(List<StreamConfig>
streamConfigs,
List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatusList, List<Integer> pausedTopicIndices,
boolean forceGetOffsetFromStream) {
PartitionGroupMetadataFetcher partitionGroupMetadataFetcher = new
PartitionGroupMetadataFetcher(
streamConfigs, partitionGroupConsumptionStatusList,
pausedTopicIndices, forceGetOffsetFromStream);
try {
DEFAULT_IDEALSTATE_UPDATE_RETRY_POLICY.attempt(partitionGroupMetadataFetcher);
- return partitionGroupMetadataFetcher.getPartitionGroupMetadataList();
+ return partitionGroupMetadataFetcher.getStreamMetadataList();
} catch (Exception e) {
Exception fetcherException =
partitionGroupMetadataFetcher.getException();
String tableNameWithType = streamConfigs.get(0).getTableNameWithType();
- LOGGER.error("Could not get PartitionGroupMetadata for topic: {} of
table: {}",
- streamConfigs.stream().map(streamConfig ->
streamConfig.getTopicName()).reduce((a, b) -> a + "," + b),
+ LOGGER.error("Could not get StreamMetadata for topic: {} of table: {}",
+
streamConfigs.stream().map(StreamConfig::getTopicName).collect(Collectors.joining(",")),
tableNameWithType, fetcherException);
ControllerMetrics controllerMetrics = ControllerMetrics.get();
controllerMetrics.addMeteredTableValue(tableNameWithType,
ControllerMeter.PARTITION_GROUP_METADATA_FETCH_ERROR,
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
index 99bee6f8a7f..6be597caa4f 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/MissingConsumingSegmentFinder.java
@@ -41,6 +41,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder;
import org.apache.pinot.spi.config.table.PauseState;
import org.apache.pinot.spi.stream.OffsetCriteria;
+import org.apache.pinot.spi.stream.PartitionGroupMetadata;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
@@ -84,10 +85,12 @@ public class MissingConsumingSegmentFinder {
});
try {
PauseState pauseState =
PinotLLCRealtimeSegmentManager.extractTablePauseState(idealState);
- PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(),
+ PinotTableIdealStateBuilder.getStreamMetadataList(streamConfigs,
Collections.emptyList(),
pauseState == null ? new ArrayList<>() :
pauseState.getIndexOfInactiveTopics(), false)
- .forEach(metadata -> {
-
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ .forEach(streamMetadata -> {
+ for (PartitionGroupMetadata metadata :
streamMetadata.getPartitionGroupMetadataList()) {
+
_partitionGroupIdToLargestStreamOffsetMap.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ }
});
} catch (Exception e) {
LOGGER.warn("Problem encountered in fetching stream metadata for topics:
{} of table: {}. "
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
index 120ab8841bd..6720161c620 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
@@ -57,7 +57,6 @@ import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.AccessOption;
import org.apache.helix.ClusterMessagingService;
import org.apache.helix.HelixAdmin;
@@ -128,6 +127,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
@@ -381,11 +381,9 @@ public class PinotLLCRealtimeSegmentManager {
*/
public void setUpNewTable(TableConfig tableConfig, IdealState idealState) {
List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
- List<Pair<PartitionGroupMetadata, Integer>> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
Collections.emptyList(), idealState).stream().map(
- x -> Pair.of(x, STARTING_SEQUENCE_NUMBER)
- ).collect(Collectors.toList());
- setUpNewTable(tableConfig, idealState, newPartitionGroupMetadataList);
+ List<StreamMetadata> streamMetadataList =
+ getNewStreamMetadataList(streamConfigs, Collections.emptyList(),
idealState);
+ setUpNewTable(tableConfig, idealState, streamMetadataList);
}
/**
@@ -393,16 +391,18 @@ public class PinotLLCRealtimeSegmentManager {
* <p>NOTE: the passed in IdealState may contain HLC segments if both HLC
and LLC are configured.
*/
public void setUpNewTable(TableConfig tableConfig, IdealState idealState,
- List<Pair<PartitionGroupMetadata, Integer>> consumeMeta) {
+ List<StreamMetadata> streamMetadataList) {
Preconditions.checkState(!_isStopping, "Segment manager is stopping");
String realtimeTableName = tableConfig.getTableName();
LOGGER.info("Setting up new LLC table: {}", realtimeTableName);
- List<StreamConfig> streamConfigs =
IngestionConfigUtils.getStreamConfigs(tableConfig);
-
streamConfigs.forEach(_flushThresholdUpdateManager::clearFlushThresholdUpdater);
+ int numPartitionGroups = 0;
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+
_flushThresholdUpdateManager.clearFlushThresholdUpdater(streamMetadata.getStreamConfig());
+ numPartitionGroups += streamMetadata.getNumPartitions();
+ }
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
- int numPartitionGroups = consumeMeta.size();
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
SegmentAssignment segmentAssignment =
@@ -412,16 +412,17 @@ public class PinotLLCRealtimeSegmentManager {
long currentTimeMs = getCurrentTimeMs();
Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
- for (Pair<PartitionGroupMetadata, Integer> pair : consumeMeta) {
- PartitionGroupMetadata metadata = pair.getLeft();
- int sequence = pair.getRight();
- StreamConfig streamConfig =
IngestionConfigUtils.getStreamConfigFromPinotPartitionId(streamConfigs,
- metadata.getPartitionGroupId());
- String segmentName =
- setupNewPartitionGroup(tableConfig, streamConfig, metadata,
sequence, currentTimeMs, instancePartitions,
- numPartitionGroups, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
segmentName, segmentAssignment,
- instancePartitionsMap);
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ StreamConfig streamConfig = streamMetadata.getStreamConfig();
+ for (PartitionGroupMetadata metadata :
streamMetadata.getPartitionGroupMetadataList()) {
+ int sequenceNumber = metadata.getSequenceNumber() >= 0
+ ? metadata.getSequenceNumber() : STARTING_SEQUENCE_NUMBER;
+ String segmentName =
+ setupNewPartitionGroup(tableConfig, streamConfig, metadata,
sequenceNumber, currentTimeMs,
+ instancePartitions, numPartitionGroups, numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
segmentName, segmentAssignment,
+ instancePartitionsMap);
+ }
}
setIdealState(realtimeTableName, idealState);
@@ -1177,6 +1178,20 @@ public class PinotLLCRealtimeSegmentManager {
}
}
+ public Map<Integer, Integer> getPartitionCountMap(List<StreamConfig>
streamConfigs)
+ throws Exception {
+ Map<Integer, Integer> streamPartitionCountMap = new HashMap<>();
+ for (int i = 0; i < streamConfigs.size(); i++) {
+ StreamConfig streamConfig = streamConfigs.get(i);
+ String clientId = getTableTopicUniqueClientId(streamConfig);
+ StreamConsumerFactory consumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
+ try (StreamMetadataProvider metadataProvider =
consumerFactory.createStreamMetadataProvider(clientId)) {
+ streamPartitionCountMap.put(i,
metadataProvider.fetchPartitionCount(STREAM_FETCH_TIMEOUT_MS));
+ }
+ }
+ return streamPartitionCountMap;
+ }
+
@VisibleForTesting
Set<Integer> getPartitionIds(List<StreamConfig> streamConfigs, IdealState
idealState) {
return getPartitionIdsWithIdealState(streamConfigs, () ->
idealState)._partitionIds;
@@ -1242,39 +1257,40 @@ public class PinotLLCRealtimeSegmentManager {
// We don't need to read partition group metadata for other
partition groups.
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList =
getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
- partitionIds.addAll(newPartitionGroupMetadataList.stream()
- .map(PartitionGroupMetadata::getPartitionGroupId)
- .collect(Collectors.toSet()));
+ List<StreamMetadata> streamMetadataList =
+ getNewStreamMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ for (PartitionGroupMetadata partitionGroupMetadata :
streamMetadata.getPartitionGroupMetadataList()) {
+ partitionIds.add(partitionGroupMetadata.getPartitionGroupId());
+ }
+ }
return new PartitionIdsWithIdealState(partitionIds, idealState);
}
return new PartitionIdsWithIdealState(partitionIds, null);
}
/**
- * Fetches the latest state of the PartitionGroups for the stream
+ * Fetches the latest state of the partition groups for all streams of the
table.
* If any partition has reached end of life, and all messages of that
partition have been consumed by the segment,
* it will be skipped from the result
*/
@VisibleForTesting
- List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig>
streamConfigs,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
- return getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState,
- false);
+ return getNewStreamMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState, false);
}
/**
- * Fetches the latest state of the PartitionGroups for the stream
+ * Fetches the latest state of the partition groups for all streams of the
table.
* If any partition has reached end of life, and all messages of that
partition have been consumed by the segment,
* it will be skipped from the result
*/
@VisibleForTesting
- List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig>
streamConfigs,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean forceGetOffsetFromStream) {
PauseState pauseState = extractTablePauseState(idealState);
- return
PinotTableIdealStateBuilder.getPartitionGroupMetadataList(streamConfigs,
+ return PinotTableIdealStateBuilder.getStreamMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList,
pauseState == null ? new ArrayList<>() :
pauseState.getIndexOfInactiveTopics(), forceGetOffsetFromStream);
}
@@ -1448,10 +1464,10 @@ public class PinotLLCRealtimeSegmentManager {
streamConfigs.stream()
.forEach(streamConfig -> streamConfig.setOffsetCriteria(
offsetsHaveToChange ? offsetCriteria :
OffsetCriteria.SMALLEST_OFFSET_CRITERIA));
- List<PartitionGroupMetadata> newPartitionGroupMetadataList =
- getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
+ List<StreamMetadata> streamMetadataList =
+ getNewStreamMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
streamConfigs.stream().forEach(streamConfig ->
streamConfig.setOffsetCriteria(originalOffsetCriteria));
- return ensureAllPartitionsConsuming(tableConfig, streamConfigs,
idealState, newPartitionGroupMetadataList,
+ return ensureAllPartitionsConsuming(tableConfig, streamConfigs,
idealState, streamMetadataList,
offsetCriteria);
} else {
LOGGER.info("Skipping LLC segments validation for table: {},
isTableEnabled: {}, isTablePaused: {}",
@@ -1488,9 +1504,9 @@ public class PinotLLCRealtimeSegmentManager {
throw new HelixHelper.PermanentUpdaterException(
"Exceeded max segment completion time for segment " +
committingSegmentName);
}
-
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
- isTablePaused(idealState) || isTopicPaused(idealState,
committingSegmentName), newSegmentName,
- segmentAssignment, instancePartitionsMap);
+
updateInstanceStatesForNewConsumingSegment(idealState.getRecord().getMapFields(),
committingSegmentName,
+ isTablePaused(idealState) || isTopicPaused(idealState,
committingSegmentName), newSegmentName,
+ segmentAssignment, instancePartitionsMap);
return idealState;
};
if (_controllerConf.getSegmentCompletionGroupCommitEnabled()) {
@@ -1706,12 +1722,15 @@ public class PinotLLCRealtimeSegmentManager {
*/
@VisibleForTesting
IdealState ensureAllPartitionsConsuming(TableConfig tableConfig,
List<StreamConfig> streamConfigs,
- IdealState idealState, List<PartitionGroupMetadata>
partitionGroupMetadataList, OffsetCriteria offsetCriteria) {
+ IdealState idealState, List<StreamMetadata> streamMetadataList,
OffsetCriteria offsetCriteria) {
String realtimeTableName = tableConfig.getTableName();
InstancePartitions instancePartitions =
getConsumingInstancePartitions(tableConfig);
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
- int numPartitions = partitionGroupMetadataList.size();
+ int numPartitions = 0;
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ numPartitions += streamMetadata.getNumPartitions();
+ }
SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixManager,
tableConfig, _controllerMetrics);
@@ -1728,8 +1747,10 @@ public class PinotLLCRealtimeSegmentManager {
// Create a map from partition id to start offset
// TODO: Directly return map from StreamMetadataProvider
Map<Integer, StreamPartitionMsgOffset> partitionIdToStartOffset =
Maps.newHashMapWithExpectedSize(numPartitions);
- for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
- partitionIdToStartOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ for (PartitionGroupMetadata metadata :
streamMetadata.getPartitionGroupMetadataList()) {
+ partitionIdToStartOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ }
}
// Create a map from partition id to the smallest stream offset
Map<Integer, StreamPartitionMsgOffset> partitionIdToSmallestOffset = null;
@@ -1867,7 +1888,7 @@ public class PinotLLCRealtimeSegmentManager {
tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getStartOffset()); // segments are
OFFLINE; start from beginning
createNewConsumingSegment(tableConfig,
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
- currentTimeMs, partitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
+ currentTimeMs, numPartitions, instancePartitions,
instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
} else {
LOGGER.info("Resuming consumption for partition: {} of table: {}",
partitionId, realtimeTableName);
@@ -1875,7 +1896,7 @@ public class PinotLLCRealtimeSegmentManager {
selectStartOffset(offsetCriteria, partitionId,
partitionIdToStartOffset, partitionIdToSmallestOffset,
tableConfig.getTableName(), offsetFactory,
latestSegmentZKMetadata.getEndOffset());
createNewConsumingSegment(tableConfig,
streamConfigs.get(streamConfigIdx), latestSegmentZKMetadata,
- currentTimeMs, partitionGroupMetadataList, instancePartitions,
instanceStatesMap, segmentAssignment,
+ currentTimeMs, numPartitions, instancePartitions,
instanceStatesMap, segmentAssignment,
instancePartitionsMap, startOffset);
}
}
@@ -1917,15 +1938,16 @@ public class PinotLLCRealtimeSegmentManager {
}
// Set up new partitions if not exist
- for (PartitionGroupMetadata partitionGroupMetadata :
partitionGroupMetadataList) {
- int partitionId = partitionGroupMetadata.getPartitionGroupId();
- int streamConfigIdx =
IngestionConfigUtils.getStreamConfigIndexFromPinotPartitionId(partitionId);
- if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
- String newSegmentName =
- setupNewPartitionGroup(tableConfig,
streamConfigs.get(streamConfigIdx), partitionGroupMetadata,
- currentTimeMs, instancePartitions, numPartitions, numReplicas);
- updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
- instancePartitionsMap);
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ for (PartitionGroupMetadata partitionGroupMetadata :
streamMetadata.getPartitionGroupMetadataList()) {
+ int partitionId = partitionGroupMetadata.getPartitionGroupId();
+ if (!latestSegmentZKMetadataMap.containsKey(partitionId)) {
+ String newSegmentName =
+ setupNewPartitionGroup(tableConfig,
streamMetadata.getStreamConfig(), partitionGroupMetadata,
+ currentTimeMs, instancePartitions, numPartitions,
numReplicas);
+ updateInstanceStatesForNewConsumingSegment(instanceStatesMap, null,
newSegmentName, segmentAssignment,
+ instancePartitionsMap);
+ }
}
}
@@ -1934,11 +1956,10 @@ public class PinotLLCRealtimeSegmentManager {
private void createNewConsumingSegment(TableConfig tableConfig, StreamConfig
streamConfig,
SegmentZKMetadata latestSegmentZKMetadata, long currentTimeMs,
- List<PartitionGroupMetadata> newPartitionGroupMetadataList,
InstancePartitions instancePartitions,
+ int numPartitions, InstancePartitions instancePartitions,
Map<String, Map<String, String>> instanceStatesMap, SegmentAssignment
segmentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap,
StreamPartitionMsgOffset startOffset) {
int numReplicas = getNumReplicas(tableConfig, instancePartitions);
- int numPartitions = newPartitionGroupMetadataList.size();
LLCSegmentName latestLLCSegmentName = new
LLCSegmentName(latestSegmentZKMetadata.getSegmentName());
LLCSegmentName newLLCSegmentName =
getNextLLCSegmentName(latestLLCSegmentName, currentTimeMs);
CommittingSegmentDescriptor committingSegmentDescriptor =
@@ -1969,11 +1990,13 @@ public class PinotLLCRealtimeSegmentManager {
// Temporarily, we are passing a boolean flag to indicate if we want to
use the current status
// The kafka implementation of computePartitionGroupMetadata() will
ignore the current status
// while the kinesis implementation will use it.
- List<PartitionGroupMetadata> partitionGroupMetadataList =
getNewPartitionGroupMetadataList(
+ List<StreamMetadata> streamMetadataList = getNewStreamMetadataList(
streamConfigs, currentPartitionGroupConsumptionStatusList,
idealState, true);
streamConfig.setOffsetCriteria(originalOffsetCriteria);
- for (PartitionGroupMetadata metadata : partitionGroupMetadataList) {
- partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ for (StreamMetadata streamMetadata : streamMetadataList) {
+ for (PartitionGroupMetadata metadata :
streamMetadata.getPartitionGroupMetadataList()) {
+ partitionGroupIdToSmallestOffset.put(metadata.getPartitionGroupId(),
metadata.getStartOffset());
+ }
}
}
return partitionGroupIdToSmallestOffset;
@@ -2065,6 +2088,7 @@ public class PinotLLCRealtimeSegmentManager {
private String setupNewPartitionGroup(TableConfig tableConfig, StreamConfig
streamConfig,
PartitionGroupMetadata partitionGroupMetadata, int sequence, long
creationTimeMs,
InstancePartitions instancePartitions, int numPartitions, int
numReplicas) {
+ Preconditions.checkArgument(sequence >= 0, "Sequence number must be >= 0,
got: %s", sequence);
String realtimeTableName = tableConfig.getTableName();
int partitionGroupId = partitionGroupMetadata.getPartitionGroupId();
String startOffset = partitionGroupMetadata.getStartOffset().toString();
@@ -2077,7 +2101,7 @@ public class PinotLLCRealtimeSegmentManager {
String newSegmentName = newLLCSegmentName.getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null,
- startOffset, 0);
+ startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
committingSegmentDescriptor, null, instancePartitions, numPartitions,
numReplicas);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
index abbfa71e3ac..dde748ef622 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotTableRestletResourceTest.java
@@ -20,8 +20,16 @@ package org.apache.pinot.controller.api.resources;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.InputStream;
+import java.util.List;
import java.util.Map;
+import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
+import org.apache.pinot.controller.helix.core.WatermarkInductionResult;
+import
org.apache.pinot.controller.helix.core.realtime.PinotLLCRealtimeSegmentManager;
+import org.apache.pinot.spi.stream.LongMsgOffset;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.utils.JsonUtils;
+import org.mockito.Mockito;
import org.testng.annotations.Test;
import static org.testng.Assert.assertEquals;
@@ -46,4 +54,50 @@ public class PinotTableRestletResourceTest {
.asText(), serverTenant + "_REALTIME");
}
}
+
+ @Test
+ public void testGetStreamMetadataList()
+ throws Exception {
+ StreamConfig streamConfig0 = Mockito.mock(StreamConfig.class);
+ StreamConfig streamConfig1 = Mockito.mock(StreamConfig.class);
+
+ Map<Integer, Integer> streamPartitionCountMap = Map.of(0, 4, 1, 8);
+ PinotLLCRealtimeSegmentManager realtimeSegmentManager =
Mockito.mock(PinotLLCRealtimeSegmentManager.class);
+
Mockito.when(realtimeSegmentManager.getPartitionCountMap(Mockito.anyList())).thenReturn(streamPartitionCountMap);
+ PinotHelixResourceManager pinotHelixResourceManager =
Mockito.mock(PinotHelixResourceManager.class);
+
Mockito.when(pinotHelixResourceManager.getRealtimeSegmentManager()).thenReturn(realtimeSegmentManager);
+ PinotTableRestletResource resource = new PinotTableRestletResource();
+ resource._pinotHelixResourceManager = pinotHelixResourceManager;
+
+ List<StreamMetadata> streamMetadataList =
resource.getStreamMetadataList(List.of(streamConfig0, streamConfig1),
+ new WatermarkInductionResult(List.of(
+ new WatermarkInductionResult.Watermark(1, 3, 101L),
+ new WatermarkInductionResult.Watermark(0, 2, 100L),
+ new WatermarkInductionResult.Watermark(10000, 5, 200L))));
+
+ assertEquals(streamMetadataList.size(), 2);
+
+ // List is ordered by streamConfigIndex (0, 1)
+ StreamMetadata streamMetadata0 = streamMetadataList.get(0);
+ assertEquals(streamMetadata0.getStreamConfig(), streamConfig0);
+ assertEquals(streamMetadata0.getNumPartitions(), 4);
+ assertEquals(streamMetadata0.getPartitionGroupMetadataList().size(), 2);
+
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(0).getPartitionGroupId(),
1);
+ assertEquals(((LongMsgOffset)
streamMetadata0.getPartitionGroupMetadataList().get(0).getStartOffset()).getOffset(),
+ 101L);
+
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(0).getSequenceNumber(),
3);
+
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(1).getPartitionGroupId(),
0);
+ assertEquals(((LongMsgOffset)
streamMetadata0.getPartitionGroupMetadataList().get(1).getStartOffset()).getOffset(),
+ 100L);
+
assertEquals(streamMetadata0.getPartitionGroupMetadataList().get(1).getSequenceNumber(),
2);
+
+ StreamMetadata streamMetadata1 = streamMetadataList.get(1);
+ assertEquals(streamMetadata1.getStreamConfig(), streamConfig1);
+ assertEquals(streamMetadata1.getNumPartitions(), 8);
+ assertEquals(streamMetadata1.getPartitionGroupMetadataList().size(), 1);
+
assertEquals(streamMetadata1.getPartitionGroupMetadataList().get(0).getPartitionGroupId(),
10000);
+ assertEquals(((LongMsgOffset)
streamMetadata1.getPartitionGroupMetadataList().get(0).getStartOffset()).getOffset(),
+ 200L);
+
assertEquals(streamMetadata1.getPartitionGroupMetadataList().get(0).getSequenceNumber(),
5);
+ }
}
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
index f4f876e4a55..e6d673b4273 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerStatelessTest.java
@@ -32,7 +32,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixAdmin;
import
org.apache.helix.controller.dataproviders.ResourceControllerDataProvider;
import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
@@ -82,6 +81,8 @@ import org.apache.pinot.spi.data.FieldSpec;
import org.apache.pinot.spi.stream.LongMsgOffset;
import org.apache.pinot.spi.stream.PartitionGroupConsumptionStatus;
import org.apache.pinot.spi.stream.PartitionGroupMetadata;
+import org.apache.pinot.spi.stream.StreamConfig;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
import org.apache.pinot.spi.utils.CommonConstants.Segment;
@@ -1690,17 +1691,16 @@ public class PinotHelixResourceManagerStatelessTest
extends ControllerTest {
waitForEVToDisappear(tableConfig.getTableName());
addDummySchema(rawTableName);
- List<Pair<PartitionGroupMetadata, Integer>> consumingMetadata = new
ArrayList<>();
- PartitionGroupMetadata metadata0 = mock(PartitionGroupMetadata.class);
- when(metadata0.getPartitionGroupId()).thenReturn(0);
-
when(metadata0.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
- consumingMetadata.add(Pair.of(metadata0, 5));
- PartitionGroupMetadata metadata1 = mock(PartitionGroupMetadata.class);
- when(metadata1.getPartitionGroupId()).thenReturn(1);
-
when(metadata1.getStartOffset()).thenReturn(mock(StreamPartitionMsgOffset.class));
- consumingMetadata.add(Pair.of(metadata1, 10));
-
- _helixResourceManager.addTable(tableConfig, consumingMetadata);
+ StreamConfig streamConfig = new StreamConfig(rawTableName + "_REALTIME",
+
FakeStreamConfigUtils.getDefaultLowLevelStreamConfigs().getStreamConfigsMap());
+ PartitionGroupMetadata metadata0 =
+ new PartitionGroupMetadata(0, mock(StreamPartitionMsgOffset.class), 5);
+ PartitionGroupMetadata metadata1 =
+ new PartitionGroupMetadata(1, mock(StreamPartitionMsgOffset.class),
10);
+ List<StreamMetadata> streamMetadataList = Collections.singletonList(
+ new StreamMetadata(streamConfig, 2, Arrays.asList(metadata0,
metadata1)));
+
+ _helixResourceManager.addTable(tableConfig, streamMetadataList);
IdealState idealState =
_helixResourceManager.getTableIdealState(realtimeTableName);
assertNotNull(idealState);
diff --git
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
index e43fba113e2..eecad3d22c4 100644
---
a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
+++
b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java
@@ -91,6 +91,7 @@ import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.stream.StreamConfigProperties;
import org.apache.pinot.spi.stream.StreamConsumerFactory;
import org.apache.pinot.spi.stream.StreamConsumerFactoryProvider;
+import org.apache.pinot.spi.stream.StreamMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.Helix;
@@ -213,6 +214,72 @@ public class PinotLLCRealtimeSegmentManagerTest {
}
}
+ @Test
+ public void testSetUpNewTableWithExplicitSequenceNumbers() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ segmentManager._numReplicas = 2;
+ segmentManager.makeTableConfig();
+ segmentManager._numInstances = 3;
+ segmentManager.makeConsumingInstancePartitions();
+
+ // Create StreamMetadata with explicit sequence numbers (simulating copy
table)
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+ List<StreamMetadata> streamMetadataList = Collections.singletonList(
+ new StreamMetadata(segmentManager._streamConfigs.get(0), 3,
+ Arrays.asList(
+ new PartitionGroupMetadata(0, PARTITION_OFFSET, 5),
+ new PartitionGroupMetadata(1, PARTITION_OFFSET, 10),
+ new PartitionGroupMetadata(2, PARTITION_OFFSET, 0))));
+ segmentManager.setUpNewTable(segmentManager._tableConfig, idealState,
streamMetadataList);
+
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+ assertEquals(instanceStatesMap.size(), 3);
+
+ // Verify segments are created with the explicit sequence numbers
+ for (String segmentName : instanceStatesMap.keySet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ int partitionGroupId = llcSegmentName.getPartitionGroupId();
+ int sequence = llcSegmentName.getSequenceNumber();
+ if (partitionGroupId == 0) {
+ assertEquals(sequence, 5);
+ } else if (partitionGroupId == 1) {
+ assertEquals(sequence, 10);
+ } else if (partitionGroupId == 2) {
+ assertEquals(sequence, 0);
+ } else {
+ fail("Unexpected partition group id: " + partitionGroupId);
+ }
+ }
+ }
+
+ @Test
+ public void testSetUpNewTableDefaultSequenceNumberResolvesToZero() {
+ FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
+ segmentManager._numReplicas = 2;
+ segmentManager.makeTableConfig();
+ segmentManager._numInstances = 3;
+ segmentManager.makeConsumingInstancePartitions();
+
+ // Create StreamMetadata with default sequence numbers (-1, from 2-arg
constructor)
+ IdealState idealState = new IdealState(REALTIME_TABLE_NAME);
+ List<StreamMetadata> streamMetadataList = Collections.singletonList(
+ new StreamMetadata(segmentManager._streamConfigs.get(0), 2,
+ Arrays.asList(
+ new PartitionGroupMetadata(0, PARTITION_OFFSET),
+ new PartitionGroupMetadata(1, PARTITION_OFFSET))));
+ segmentManager.setUpNewTable(segmentManager._tableConfig, idealState,
streamMetadataList);
+
+ Map<String, Map<String, String>> instanceStatesMap =
idealState.getRecord().getMapFields();
+ assertEquals(instanceStatesMap.size(), 2);
+
+ // Verify all segments are created with sequence 0 (default resolved from
-1)
+ for (String segmentName : instanceStatesMap.keySet()) {
+ LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
+ assertEquals(llcSegmentName.getSequenceNumber(), 0,
+ "Default sequence number -1 should resolve to 0 for partition " +
llcSegmentName.getPartitionGroupId());
+ }
+ }
+
private void setUpNewTable(FakePinotLLCRealtimeSegmentManager
segmentManager, int numReplicas, int numInstances,
int numPartitions) {
segmentManager._numReplicas = numReplicas;
@@ -297,12 +364,17 @@ public class PinotLLCRealtimeSegmentManagerTest {
// Expected
}
- // committing segment's partitionGroupId no longer in the
newPartitionGroupMetadataList
- List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 =
-
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs,
Collections.emptyList(),
+ // committing segment's partitionGroupId no longer in the
newStreamMetadataList
+ List<StreamMetadata> streamMetadataListWithout0 =
+ segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs,
Collections.emptyList(),
mock(IdealState.class));
- partitionGroupMetadataListWithout0.remove(0);
- segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout0;
+ // Remove partition 0 from the first stream's metadata
+ StreamMetadata originalSm = streamMetadataListWithout0.get(0);
+ List<PartitionGroupMetadata> filteredList = new
ArrayList<>(originalSm.getPartitionGroupMetadataList());
+ filteredList.remove(0);
+ segmentManager._streamMetadataList = Collections.singletonList(
+ new StreamMetadata(originalSm.getStreamConfig(),
+ originalSm.getNumPartitions(), filteredList));
// Commit a segment for partition 0 - No new entries created for partition
which reached end of life
committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 2,
CURRENT_TIME_MS).getSegmentName();
@@ -442,7 +514,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
"pauseConsumption should include consuming segments from the updated
ideal state");
}
-
@Test
public void testCommitSegmentWithOffsetAutoResetOnOffset()
throws Exception {
@@ -852,11 +923,16 @@ public class PinotLLCRealtimeSegmentManagerTest {
* End of shard cases
*/
// 1 reached end of shard.
- List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 =
-
segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs,
Collections.emptyList(),
+ List<StreamMetadata> streamMetadataListWithout1 =
+ segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs,
Collections.emptyList(),
mock(IdealState.class));
- partitionGroupMetadataListWithout1.remove(1);
- segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout1;
+ // Remove partition 1 from the first stream's metadata
+ StreamMetadata origSm = streamMetadataListWithout1.get(0);
+ List<PartitionGroupMetadata> filteredPgList = new
ArrayList<>(origSm.getPartitionGroupMetadataList());
+ filteredPgList.remove(1);
+ segmentManager._streamMetadataList = Collections.singletonList(
+ new StreamMetadata(origSm.getStreamConfig(),
+ origSm.getNumPartitions(), filteredPgList));
// noop
testRepairs(segmentManager, Collections.emptyList());
@@ -871,7 +947,6 @@ public class PinotLLCRealtimeSegmentManagerTest {
testRepairs(segmentManager, Lists.newArrayList(1));
// make the last ONLINE segment of the shard as CONSUMING (failed between
step1 and 3)
- segmentManager._partitionGroupMetadataList =
partitionGroupMetadataListWithout1;
consumingSegment = new LLCSegmentName(RAW_TABLE_NAME, 1, 1,
CURRENT_TIME_MS).getSegmentName();
turnNewConsumingSegmentConsuming(instanceStatesMap, consumingSegment);
@@ -1282,9 +1357,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager =
spy(new FakePinotLLCRealtimeSegmentManager(mockHelixResourceManager));
setUpNewTable(segmentManager, 2, 5, 4);
- segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
- .mapToObj(partition -> new PartitionGroupMetadata(partition,
PARTITION_OFFSET))
- .collect(Collectors.toList());
+ segmentManager._streamMetadataList = Collections.singletonList(
+ new StreamMetadata(segmentManager._streamConfigs.get(0), 4,
+ IntStream.range(0, 4)
+ .mapToObj(partition -> new PartitionGroupMetadata(partition,
PARTITION_OFFSET))
+ .collect(Collectors.toList())));
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(committingSegment,
@@ -1300,9 +1377,11 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void
testCommitSegmentMetadataSkipsCreatingNewMetadataWhenTopicPausedIfPartitionIdsFallbackNeeded()
{
FakePinotLLCRealtimeSegmentManager segmentManager = spy(new
FakePinotLLCRealtimeSegmentManager());
setUpNewTable(segmentManager, 2, 5, 4);
- segmentManager._partitionGroupMetadataList = IntStream.range(0, 4)
- .mapToObj(partition -> new PartitionGroupMetadata(partition,
PARTITION_OFFSET))
- .collect(Collectors.toList());
+ segmentManager._streamMetadataList = Collections.singletonList(
+ new StreamMetadata(segmentManager._streamConfigs.get(0), 4,
+ IntStream.range(0, 4)
+ .mapToObj(partition -> new PartitionGroupMetadata(partition,
PARTITION_OFFSET))
+ .collect(Collectors.toList())));
PauseState pauseState =
new PauseState(false, PauseState.ReasonCode.ADMINISTRATIVE,
"pause-topic-for-test",
@@ -1321,7 +1400,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
assertFalse(segmentManager._segmentZKMetadataMap.containsKey(expectedNewConsumingSegment));
assertFalse(segmentManager._idealState.getRecord().getMapFields().containsKey(expectedNewConsumingSegment));
ZkHelixPropertyStore<ZNRecord> propertyStore =
- (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ segmentManager._mockResourceManager.getPropertyStore();
verify(propertyStore, never()).remove(anyString(),
eq(AccessOption.PERSISTENT));
}
@@ -1330,7 +1409,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
setUpNewTable(segmentManager, 2, 5, 4);
ZkHelixPropertyStore<ZNRecord> propertyStore =
- (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ segmentManager._mockResourceManager.getPropertyStore();
when(propertyStore.remove(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
PauseState pauseState = new PauseState(true,
PauseState.ReasonCode.ADMINISTRATIVE, "pause-for-test",
@@ -1358,7 +1437,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
setUpNewTable(segmentManager, 2, 5, 4);
ZkHelixPropertyStore<ZNRecord> propertyStore =
- (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ segmentManager._mockResourceManager.getPropertyStore();
when(propertyStore.remove(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
PauseState pauseState =
@@ -1385,7 +1464,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
FakePinotLLCRealtimeSegmentManager segmentManager = new
FakePinotLLCRealtimeSegmentManager();
setUpNewTable(segmentManager, 2, 5, 4);
ZkHelixPropertyStore<ZNRecord> propertyStore =
- (ZkHelixPropertyStore<ZNRecord>)
segmentManager._mockResourceManager.getPropertyStore();
+ segmentManager._mockResourceManager.getPropertyStore();
when(propertyStore.remove(anyString(),
eq(AccessOption.PERSISTENT))).thenReturn(true);
String committingSegment = new LLCSegmentName(RAW_TABLE_NAME, 0, 0,
CURRENT_TIME_MS).getSegmentName();
@@ -1855,11 +1934,12 @@ public class PinotLLCRealtimeSegmentManagerTest {
new PartitionGroupConsumptionStatus(1, 12, new LongMsgOffset(123),
new LongMsgOffset(345), "ONLINE"));
doReturn(partitionGroupConsumptionStatusList).when(segmentManagerSpy)
.getPartitionGroupConsumptionStatusList(idealState, streamConfigs);
- List<PartitionGroupMetadata> partitionGroupMetadataList =
- List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
- new PartitionGroupMetadata(1, new LongMsgOffset(345)));
- doReturn(partitionGroupMetadataList).when(segmentManagerSpy)
- .getNewPartitionGroupMetadataList(streamConfigs,
partitionGroupConsumptionStatusList, idealState);
+ List<StreamMetadata> streamMetadataList =
+ List.of(new StreamMetadata(streamConfigs.get(0), 2,
+ List.of(new PartitionGroupMetadata(0, new LongMsgOffset(234)),
+ new PartitionGroupMetadata(1, new LongMsgOffset(345)))));
+ doReturn(streamMetadataList).when(segmentManagerSpy)
+ .getNewStreamMetadataList(streamConfigs,
partitionGroupConsumptionStatusList, idealState);
partitionIds = segmentManagerSpy.getPartitionIds(streamConfigs,
idealState);
Assert.assertEquals(partitionIds.size(), 2);
}
@@ -2250,7 +2330,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
Map<String, Integer> _segmentZKMetadataVersionMap = new HashMap<>();
IdealState _idealState;
int _numPartitions;
- List<PartitionGroupMetadata> _partitionGroupMetadataList = null;
+ List<StreamMetadata> _streamMetadataList = null;
boolean _exceededMaxSegmentCompletionTime = false;
FileUploadDownloadClient _mockedFileUploadDownloadClient;
PinotHelixResourceManager _mockResourceManager;
@@ -2330,7 +2410,7 @@ public class PinotLLCRealtimeSegmentManagerTest {
public void ensureAllPartitionsConsuming() {
ensureAllPartitionsConsuming(_tableConfig, _streamConfigs, _idealState,
- getNewPartitionGroupMetadataList(_streamConfigs,
Collections.emptyList(), mock(IdealState.class)), null);
+ getNewStreamMetadataList(_streamConfigs, Collections.emptyList(),
mock(IdealState.class)), null);
}
@Override
@@ -2407,28 +2487,31 @@ public class PinotLLCRealtimeSegmentManagerTest {
@Override
Set<Integer> getPartitionIds(StreamConfig streamConfig) {
- if (_partitionGroupMetadataList != null) {
+ if (_streamMetadataList != null) {
throw new UnsupportedOperationException();
}
return IntStream.range(0,
_numPartitions).boxed().collect(Collectors.toSet());
}
@Override
- List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig>
streamConfigs,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState) {
- if (_partitionGroupMetadataList != null) {
- return _partitionGroupMetadataList;
+ if (_streamMetadataList != null) {
+ return _streamMetadataList;
} else {
- return IntStream.range(0, _numPartitions).mapToObj(i -> new
PartitionGroupMetadata(i, PARTITION_OFFSET))
- .collect(Collectors.toList());
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
+ IntStream.range(0, _numPartitions).mapToObj(i -> new
PartitionGroupMetadata(i, PARTITION_OFFSET))
+ .collect(Collectors.toList());
+ return Collections.singletonList(
+ new StreamMetadata(streamConfigs.get(0), _numPartitions,
partitionGroupMetadataList));
}
}
@Override
- List<PartitionGroupMetadata>
getNewPartitionGroupMetadataList(List<StreamConfig> streamConfigs,
+ List<StreamMetadata> getNewStreamMetadataList(List<StreamConfig>
streamConfigs,
List<PartitionGroupConsumptionStatus>
currentPartitionGroupConsumptionStatusList, IdealState idealState,
boolean forceGetOffsetFromStream) {
- return getNewPartitionGroupMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
+ return getNewStreamMetadataList(streamConfigs,
currentPartitionGroupConsumptionStatusList, idealState);
}
@Override
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
index d5127b6d3be..3b7103c8284 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java
@@ -22,18 +22,27 @@ package org.apache.pinot.spi.stream;
* A PartitionGroup is a group of partitions/shards that the same consumer
should consume from.
* This class is a container for the metadata regarding a partition group,
that is needed by a consumer to start
* consumption.
- * It consists of
+ * It consists of:
* 1. A unique partition group id for this partition group
* 2. The start offset to begin consumption for this partition group
+ * 3. The sequence number for the consuming segment (used when creating
segments with designated offsets/sequences)
*/
public class PartitionGroupMetadata {
+ private static final int DEFAULT_SEQUENCE_NUMBER = -1;
+
private final int _partitionGroupId;
private final StreamPartitionMsgOffset _startOffset;
+ private final int _sequenceNumber;
public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset
startOffset) {
+ this(partitionGroupId, startOffset, DEFAULT_SEQUENCE_NUMBER);
+ }
+
+ public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset
startOffset, int sequenceNumber) {
_partitionGroupId = partitionGroupId;
_startOffset = startOffset;
+ _sequenceNumber = sequenceNumber;
}
public int getPartitionGroupId() {
@@ -43,4 +52,8 @@ public class PartitionGroupMetadata {
public StreamPartitionMsgOffset getStartOffset() {
return _startOffset;
}
+
+ public int getSequenceNumber() {
+ return _sequenceNumber;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
index 698ad472e1a..2e0443228d6 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java
@@ -19,6 +19,7 @@
package org.apache.pinot.spi.stream;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.stream.Collectors;
@@ -28,16 +29,17 @@ import org.slf4j.LoggerFactory;
/**
- * Fetches the list of {@link PartitionGroupMetadata} for all partition groups
of the streams,
+ * Fetches the {@link StreamMetadata} for all streams of a table,
* using the {@link StreamMetadataProvider}
*/
public class PartitionGroupMetadataFetcher implements Callable<Boolean> {
private static final Logger LOGGER =
LoggerFactory.getLogger(PartitionGroupMetadataFetcher.class);
+ private static final int METADATA_FETCH_TIMEOUT_MS = 15000;
private final List<StreamConfig> _streamConfigs;
private final List<PartitionGroupConsumptionStatus>
_partitionGroupConsumptionStatusList;
private final boolean _forceGetOffsetFromStream;
- private final List<PartitionGroupMetadata> _newPartitionGroupMetadataList =
new ArrayList<>();
+ private final List<StreamMetadata> _streamMetadataList = new ArrayList<>();
private final List<Integer> _pausedTopicIndices;
private Exception _exception;
@@ -51,8 +53,18 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
_pausedTopicIndices = pausedTopicIndices;
}
+ public List<StreamMetadata> getStreamMetadataList() {
+ return Collections.unmodifiableList(_streamMetadataList);
+ }
+
+ /**
+ * @deprecated after 1.5.0 release. Use {@link #getStreamMetadataList()}
instead.
+ */
+ @Deprecated
public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
- return _newPartitionGroupMetadataList;
+ return _streamMetadataList.stream()
+ .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
+ .collect(Collectors.toList());
}
public Exception getException() {
@@ -60,14 +72,15 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
}
/**
- * Callable to fetch the {@link PartitionGroupMetadata} list, from the
stream.
+ * Callable to fetch the {@link StreamMetadata} list from the streams.
* The stream requires the list of {@link PartitionGroupConsumptionStatus}
to compute the new
* {@link PartitionGroupMetadata}
*/
@Override
public Boolean call()
throws Exception {
- _newPartitionGroupMetadataList.clear();
+ _streamMetadataList.clear();
+ _exception = null;
return _streamConfigs.size() == 1 ? fetchSingleStream() :
fetchMultipleStreams();
}
@@ -81,18 +94,18 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
StreamConsumerFactory streamConsumerFactory =
StreamConsumerFactoryProvider.create(streamConfig);
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
-
_newPartitionGroupMetadataList.addAll(streamMetadataProvider.computePartitionGroupMetadata(clientId,
streamConfig,
- _partitionGroupConsumptionStatusList, /*maxWaitTimeMs=*/15000,
_forceGetOffsetFromStream));
- if (_exception != null) {
- // We had at least one failure, but succeeded now. Log an info
- LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic
{}", topicName);
- }
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
+ streamMetadataProvider.computePartitionGroupMetadata(clientId,
streamConfig,
+ _partitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/METADATA_FETCH_TIMEOUT_MS,
+ _forceGetOffsetFromStream);
+ _streamMetadataList.add(
+ new StreamMetadata(streamConfig, partitionGroupMetadataList.size(),
partitionGroupMetadataList));
} catch (TransientConsumerException e) {
- LOGGER.warn("Transient Exception: Could not get partition count for
topic {}", topicName, e);
+ LOGGER.warn("Transient Exception: Could not get StreamMetadata for topic
{}", topicName, e);
_exception = e;
return Boolean.FALSE;
} catch (Exception e) {
- LOGGER.warn("Could not get partition count for topic {}", topicName, e);
+ LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
_exception = e;
throw e;
}
@@ -104,7 +117,7 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
int numStreams = _streamConfigs.size();
for (int i = 0; i < numStreams; i++) {
if (_pausedTopicIndices.contains(i)) {
- LOGGER.info("Skipping fetching PartitionGroupMetadata for paused
topic: {}",
+ LOGGER.info("Skipping fetching StreamMetadata for paused topic: {}",
_streamConfigs.get(i).getTopicName());
continue;
}
@@ -122,25 +135,24 @@ public class PartitionGroupMetadataFetcher implements
Callable<Boolean> {
.collect(Collectors.toList());
try (StreamMetadataProvider streamMetadataProvider =
streamConsumerFactory.createStreamMetadataProvider(
StreamConsumerFactory.getUniqueClientId(clientId))) {
- _newPartitionGroupMetadataList.addAll(
+ List<PartitionGroupMetadata> partitionGroupMetadataList =
streamMetadataProvider.computePartitionGroupMetadata(clientId,
- streamConfig, topicPartitionGroupConsumptionStatusList,
/*maxWaitTimeMs=*/15000,
+ streamConfig, topicPartitionGroupConsumptionStatusList,
+ /*maxWaitTimeMs=*/METADATA_FETCH_TIMEOUT_MS,
_forceGetOffsetFromStream)
.stream()
.map(metadata -> new PartitionGroupMetadata(
IngestionConfigUtils.getPinotPartitionIdFromStreamPartitionId(metadata.getPartitionGroupId(),
- index), metadata.getStartOffset()))
- .collect(Collectors.toList()));
- if (_exception != null) {
- // We had at least one failure, but succeeded now. Log an info
- LOGGER.info("Successfully retrieved PartitionGroupMetadata for topic
{}", topicName);
- }
+ index), metadata.getStartOffset(),
metadata.getSequenceNumber()))
+ .collect(Collectors.toList());
+ _streamMetadataList.add(
+ new StreamMetadata(streamConfig,
partitionGroupMetadataList.size(), partitionGroupMetadataList));
} catch (TransientConsumerException e) {
- LOGGER.warn("Transient Exception: Could not get partition count for
topic {}", topicName, e);
+ LOGGER.warn("Transient Exception: Could not get StreamMetadata for
topic {}", topicName, e);
_exception = e;
return Boolean.FALSE;
} catch (Exception e) {
- LOGGER.warn("Could not get partition count for topic {}", topicName,
e);
+ LOGGER.warn("Could not get StreamMetadata for topic {}", topicName, e);
_exception = e;
throw e;
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java
new file mode 100644
index 00000000000..a59e056b02b
--- /dev/null
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.List;
+
+
+/**
+ * Groups partition metadata for a single stream/topic.
+ *
+ * <p>This replaces the flat {@code List<PartitionGroupMetadata>} pattern
where partitions from all streams were mixed
+ * together and required partition ID padding to identify stream membership.
+ *
+ * <p>The {@link PartitionGroupMetadata} items within this container use
Pinot-encoded partition IDs
+ * (i.e., {@code streamIndex * 10000 + streamPartitionId}) to maintain
backward compatibility with segment names
+ * stored in ZooKeeper.
+ */
+public class StreamMetadata {
+
+ private final StreamConfig _streamConfig;
+ private final int _numPartitions;
+ private final List<PartitionGroupMetadata> _partitionGroupMetadataList;
+
+ public StreamMetadata(StreamConfig streamConfig, int numPartitions,
+ List<PartitionGroupMetadata> partitionGroupMetadataList) {
+ _streamConfig = streamConfig;
+ _numPartitions = numPartitions;
+ _partitionGroupMetadataList = List.copyOf(partitionGroupMetadataList);
+ }
+
+ public StreamConfig getStreamConfig() {
+ return _streamConfig;
+ }
+
+ public String getTopicName() {
+ return _streamConfig.getTopicName();
+ }
+
+ public List<PartitionGroupMetadata> getPartitionGroupMetadataList() {
+ return _partitionGroupMetadataList;
+ }
+
+ /**
+ * Returns the total number of partitions for this stream. This may be
greater than the size of
+ * {@link #getPartitionGroupMetadataList()} when only a subset of partitions
is assigned.
+ */
+ public int getNumPartitions() {
+ return _numPartitions;
+ }
+}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
index 9fa65254b63..1c229b602da 100644
---
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcherTest.java
@@ -27,10 +27,7 @@ import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
-import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.ArgumentMatchers.*;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@@ -70,7 +67,10 @@ public class PartitionGroupMetadataFetcherTest {
// Verify
Assert.assertTrue(result);
- Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 1);
+ List<StreamMetadata> streamMetadataList =
fetcher.getStreamMetadataList();
+ Assert.assertEquals(streamMetadataList.size(), 1);
+ Assert.assertEquals(streamMetadataList.get(0).getNumPartitions(), 1);
+
Assert.assertEquals(streamMetadataList.get(0).getPartitionGroupMetadataList().size(),
1);
Assert.assertNull(fetcher.getException());
}
}
@@ -85,6 +85,7 @@ public class PartitionGroupMetadataFetcherTest {
List<PartitionGroupConsumptionStatus> statusList = Collections.emptyList();
StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
any(List.class), anyInt(), anyBoolean()))
.thenThrow(new TransientConsumerException(new
RuntimeException("Transient error")));
@@ -143,12 +144,18 @@ public class PartitionGroupMetadataFetcherTest {
// Verify
Assert.assertTrue(result);
- Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+ List<StreamMetadata> streamMetadataList =
fetcher.getStreamMetadataList();
+ Assert.assertEquals(streamMetadataList.size(), 2);
Assert.assertNull(fetcher.getException());
+ Assert.assertEquals(streamMetadataList.get(0).getNumPartitions(), 2);
+
Assert.assertEquals(streamMetadataList.get(0).getPartitionGroupMetadataList().size(),
2);
+ Assert.assertEquals(streamMetadataList.get(1).getNumPartitions(), 2);
+
Assert.assertEquals(streamMetadataList.get(1).getPartitionGroupMetadataList().size(),
2);
+
// Verify the correct partition group IDs: 0, 1, 10000, 10001
- List<PartitionGroupMetadata> resultMetadata =
fetcher.getPartitionGroupMetadataList();
- List<Integer> partitionIds = resultMetadata.stream()
+ List<Integer> partitionIds = streamMetadataList.stream()
+ .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
.map(PartitionGroupMetadata::getPartitionGroupId)
.sorted()
.collect(Collectors.toList());
@@ -174,6 +181,7 @@ public class PartitionGroupMetadataFetcherTest {
PartitionGroupMetadata mockedMetadata2 = new PartitionGroupMetadata(1,
mock(StreamPartitionMsgOffset.class));
StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(3);
when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
any(List.class), anyInt(), anyBoolean()))
.thenReturn(Arrays.asList(mockedMetadata1, mockedMetadata2));
@@ -186,19 +194,20 @@ public class PartitionGroupMetadataFetcherTest {
mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
- streamConfigs, statusList, Arrays.asList(1), false);
+ streamConfigs, statusList, List.of(1), false);
// Execute
Boolean result = fetcher.call();
- // Verify
+ // Verify - 2 streams active (topic1 at index 0, topic3 at index 2;
topic2 at index 1 is paused)
Assert.assertTrue(result);
- Assert.assertEquals(fetcher.getPartitionGroupMetadataList().size(), 4);
+ List<StreamMetadata> streamMetadataList =
fetcher.getStreamMetadataList();
+ Assert.assertEquals(streamMetadataList.size(), 2);
Assert.assertNull(fetcher.getException());
// Verify the correct partition group IDs
- List<PartitionGroupMetadata> resultMetadata =
fetcher.getPartitionGroupMetadataList();
- List<Integer> partitionIds = resultMetadata.stream()
+ List<Integer> partitionIds = streamMetadataList.stream()
+ .flatMap(sm -> sm.getPartitionGroupMetadataList().stream())
.map(PartitionGroupMetadata::getPartitionGroupId)
.sorted()
.collect(Collectors.toList());
@@ -207,10 +216,259 @@ public class PartitionGroupMetadataFetcherTest {
}
}
+ @Test
+ public void testDeprecatedGetPartitionGroupMetadataListFlatMaps()
+ throws Exception {
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2",
"test-table", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2);
+
+ PartitionGroupConsumptionStatus status1 = new
PartitionGroupConsumptionStatus(0, 0, null, null, "IN_PROGRESS");
+ List<PartitionGroupConsumptionStatus> statusList =
Collections.singletonList(status1);
+
+ StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+ PartitionGroupMetadata m1 = new PartitionGroupMetadata(0, offset);
+ PartitionGroupMetadata m2 = new PartitionGroupMetadata(1, offset);
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(2);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean())).thenReturn(Arrays.asList(m1,
m2));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, statusList, Collections.emptyList(), false);
+ fetcher.call();
+
+ // Deprecated method should flat-map across all streams
+ List<PartitionGroupMetadata> flatList =
fetcher.getPartitionGroupMetadataList();
+ Assert.assertEquals(flatList.size(), 4); // 2 per stream * 2 streams
+ }
+ }
+
+ @Test
+ public void testExceptionResetOnRetry()
+ throws Exception {
+ StreamConfig streamConfig = createMockStreamConfig("test-topic",
"test-table", false);
+ List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+ StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+ PartitionGroupMetadata metadata = new PartitionGroupMetadata(0, offset);
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
+ // First call: transient failure; second call: success
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean()))
+ .thenThrow(new TransientConsumerException(new
RuntimeException("Transient")))
+ .thenReturn(Collections.singletonList(metadata));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, Collections.emptyList(), Collections.emptyList(),
false);
+
+ // First call fails
+ Boolean result1 = fetcher.call();
+ Assert.assertFalse(result1);
+ Assert.assertNotNull(fetcher.getException());
+
+ // Second call succeeds - exception should be reset
+ Boolean result2 = fetcher.call();
+ Assert.assertTrue(result2);
+ Assert.assertNull(fetcher.getException());
+ Assert.assertEquals(fetcher.getStreamMetadataList().size(), 1);
+ }
+ }
+
+ @Test
+ public void testSequenceNumberPreservedInMultiStreamRemap()
+ throws Exception {
+ StreamConfig streamConfig1 = createMockStreamConfig("topic1",
"test-table", false);
+ StreamConfig streamConfig2 = createMockStreamConfig("topic2",
"test-table", false);
+ List<StreamConfig> streamConfigs = Arrays.asList(streamConfig1,
streamConfig2);
+
+ List<PartitionGroupConsumptionStatus> statusList = Collections.emptyList();
+
+ StreamPartitionMsgOffset offset = mock(StreamPartitionMsgOffset.class);
+ PartitionGroupMetadata m1 = new PartitionGroupMetadata(0, offset, 7);
+ PartitionGroupMetadata m2 = new PartitionGroupMetadata(1, offset, 3);
+
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(2);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(), anyBoolean())).thenReturn(Arrays.asList(m1,
m2));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, statusList, Collections.emptyList(), false);
+ fetcher.call();
+
+ List<StreamMetadata> streamMetadataList =
fetcher.getStreamMetadataList();
+ Assert.assertEquals(streamMetadataList.size(), 2);
+
+ // Second stream's partitions should have remapped IDs but preserved
sequence numbers
+ List<PartitionGroupMetadata> stream1Partitions =
streamMetadataList.get(1).getPartitionGroupMetadataList();
+ Assert.assertEquals(stream1Partitions.get(0).getPartitionGroupId(),
10000);
+ Assert.assertEquals(stream1Partitions.get(0).getSequenceNumber(), 7);
+ Assert.assertEquals(stream1Partitions.get(1).getPartitionGroupId(),
10001);
+ Assert.assertEquals(stream1Partitions.get(1).getSequenceNumber(), 3);
+ }
+ }
+
+ @Test
+ public void testGetStreamMetadataListReturnsUnmodifiable()
+ throws Exception {
+ StreamConfig streamConfig = createMockStreamConfig("test-topic",
"test-table", false);
+ List<StreamConfig> streamConfigs = Collections.singletonList(streamConfig);
+
+ PartitionGroupMetadata metadata = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+ StreamMetadataProvider metadataProvider =
mock(StreamMetadataProvider.class);
+ when(metadataProvider.fetchPartitionCount(anyLong())).thenReturn(1);
+ when(metadataProvider.computePartitionGroupMetadata(anyString(),
any(StreamConfig.class),
+ any(List.class), anyInt(),
anyBoolean())).thenReturn(Collections.singletonList(metadata));
+
+ StreamConsumerFactory factory = mock(StreamConsumerFactory.class);
+
when(factory.createStreamMetadataProvider(anyString())).thenReturn(metadataProvider);
+
+ try (MockedStatic<StreamConsumerFactoryProvider> mockedProvider =
Mockito.mockStatic(
+ StreamConsumerFactoryProvider.class)) {
+ mockedProvider.when(() ->
StreamConsumerFactoryProvider.create(any(StreamConfig.class))).thenReturn(factory);
+
+ PartitionGroupMetadataFetcher fetcher = new
PartitionGroupMetadataFetcher(
+ streamConfigs, Collections.emptyList(), Collections.emptyList(),
false);
+ fetcher.call();
+
+ try {
+ fetcher.getStreamMetadataList().add(
+ new StreamMetadata(streamConfig, 1, Collections.emptyList()));
+ Assert.fail("Expected UnsupportedOperationException");
+ } catch (UnsupportedOperationException e) {
+ // expected
+ }
+ }
+ }
+
private StreamConfig createMockStreamConfig(String topicName, String
tableName, boolean isEphemeral) {
StreamConfig streamConfig = mock(StreamConfig.class);
when(streamConfig.getTopicName()).thenReturn(topicName);
when(streamConfig.getTableNameWithType()).thenReturn(tableName);
return streamConfig;
}
+
+ private static final class DefaultComputeOnlyMetadataProvider implements
StreamMetadataProvider {
+ private int _fetchPartitionCountCalls;
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ _fetchPartitionCountCalls++;
+ return 1;
+ }
+
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
+ public boolean supportsOffsetLag() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ int getFetchPartitionCountCalls() {
+ return _fetchPartitionCountCalls;
+ }
+ }
+
+ private static final class OverriddenComputeMetadataProvider implements
StreamMetadataProvider {
+ private int _fetchPartitionCountCalls;
+ private final StreamPartitionMsgOffset _offset =
mock(StreamPartitionMsgOffset.class);
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ _fetchPartitionCountCalls++;
+ return 3;
+ }
+
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatuses, int timeoutMillis,
+ boolean forceGetOffsetFromStream) {
+ return Collections.singletonList(new PartitionGroupMetadata(0, _offset));
+ }
+
+ @Override
+ public boolean supportsOffsetLag() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ int getFetchPartitionCountCalls() {
+ return _fetchPartitionCountCalls;
+ }
+ }
+
+ private static final class OverriddenFourArgComputeMetadataProvider
implements StreamMetadataProvider {
+ private int _fetchPartitionCountCalls;
+ private final StreamPartitionMsgOffset _offset =
mock(StreamPartitionMsgOffset.class);
+
+ @Override
+ public int fetchPartitionCount(long timeoutMillis) {
+ _fetchPartitionCountCalls++;
+ return 3;
+ }
+
+ @Override
+ public StreamPartitionMsgOffset fetchStreamPartitionOffset(OffsetCriteria
offsetCriteria, long timeoutMillis) {
+ throw new UnsupportedOperationException("Should not be called");
+ }
+
+ @Override
+ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String
clientId, StreamConfig streamConfig,
+ List<PartitionGroupConsumptionStatus>
partitionGroupConsumptionStatuses, int timeoutMillis) {
+ return Collections.singletonList(new PartitionGroupMetadata(0, _offset));
+ }
+
+ @Override
+ public boolean supportsOffsetLag() {
+ return false;
+ }
+
+ @Override
+ public void close() {
+ }
+
+ int getFetchPartitionCountCalls() {
+ return _fetchPartitionCountCalls;
+ }
+ }
}
diff --git
a/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataTest.java
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataTest.java
new file mode 100644
index 00000000000..4afb14aae25
--- /dev/null
+++
b/pinot-spi/src/test/java/org/apache/pinot/spi/stream/StreamMetadataTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.spi.stream;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertSame;
+
+
+public class StreamMetadataTest {
+
+ @Test
+ public void testGetters() {
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ when(streamConfig.getTopicName()).thenReturn("test-topic");
+
+ PartitionGroupMetadata pg0 = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+ PartitionGroupMetadata pg1 = new PartitionGroupMetadata(1,
mock(StreamPartitionMsgOffset.class), 5);
+ List<PartitionGroupMetadata> pgList = Arrays.asList(pg0, pg1);
+
+ StreamMetadata sm = new StreamMetadata(streamConfig, 10, pgList);
+
+ assertSame(sm.getStreamConfig(), streamConfig);
+ assertEquals(sm.getTopicName(), "test-topic");
+ assertEquals(sm.getNumPartitions(), 10);
+ assertEquals(sm.getPartitionGroupMetadataList().size(), 2);
+ assertSame(sm.getPartitionGroupMetadataList().get(0), pg0);
+ assertSame(sm.getPartitionGroupMetadataList().get(1), pg1);
+ }
+
+ @Test
+ public void testNumPartitionsCanExceedListSize() {
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ PartitionGroupMetadata pg = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+
+ StreamMetadata sm = new StreamMetadata(streamConfig, 100,
Collections.singletonList(pg));
+
+ assertEquals(sm.getNumPartitions(), 100);
+ assertEquals(sm.getPartitionGroupMetadataList().size(), 1);
+ }
+
+ @Test
+ public void testDefensiveCopy() {
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ PartitionGroupMetadata pg0 = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+ List<PartitionGroupMetadata> mutableList = new ArrayList<>();
+ mutableList.add(pg0);
+
+ StreamMetadata sm = new StreamMetadata(streamConfig, 1, mutableList);
+ assertEquals(sm.getPartitionGroupMetadataList().size(), 1);
+
+ // Mutating the original list should not affect StreamMetadata
+ mutableList.add(new PartitionGroupMetadata(1,
mock(StreamPartitionMsgOffset.class)));
+ assertEquals(sm.getPartitionGroupMetadataList().size(), 1);
+ }
+
+ @Test(expectedExceptions = UnsupportedOperationException.class)
+ public void testPartitionGroupMetadataListIsUnmodifiable() {
+ StreamConfig streamConfig = mock(StreamConfig.class);
+ PartitionGroupMetadata pg = new PartitionGroupMetadata(0,
mock(StreamPartitionMsgOffset.class));
+
+ StreamMetadata sm = new StreamMetadata(streamConfig, 1,
Collections.singletonList(pg));
+ sm.getPartitionGroupMetadataList().add(new PartitionGroupMetadata(1,
mock(StreamPartitionMsgOffset.class)));
+ }
+
+ @Test
+ public void testEmptyPartitionGroupMetadataList() {
+ StreamConfig streamConfig = mock(StreamConfig.class);
+
+ StreamMetadata sm = new StreamMetadata(streamConfig, 5,
Collections.emptyList());
+
+ assertEquals(sm.getNumPartitions(), 5);
+ assertEquals(sm.getPartitionGroupMetadataList().size(), 0);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]