Copilot commented on code in PR #17811: URL: https://github.com/apache/pinot/pull/17811#discussion_r2886009514
########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/StreamMetadata.java: ########## @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.spi.stream; + +import java.util.Collections; +import java.util.List; + + +/** + * Groups partition metadata for a single stream/topic. + * + * <p>This replaces the flat {@code List<PartitionGroupMetadata>} pattern where partitions from all streams were mixed + * together and required partition ID padding to identify stream membership. + * + * <p>The {@link PartitionGroupMetadata} items within this container use Pinot-encoded partition IDs + * (i.e., {@code streamIndex * 10000 + streamPartitionId}) to maintain backward compatibility with segment names + * stored in ZooKeeper. + */ +public class StreamMetadata { + + private final StreamConfig _streamConfig; + private final int _streamConfigIndex; + private final int _partitionCount; + private final List<PartitionGroupMetadata> _partitionGroupMetadataList; + + public StreamMetadata(StreamConfig streamConfig, int streamConfigIndex, int partitionCount, + List<PartitionGroupMetadata> partitionGroupMetadataList) { + _streamConfig = streamConfig; + _streamConfigIndex = streamConfigIndex; + _partitionCount = partitionCount; + _partitionGroupMetadataList = partitionGroupMetadataList; + } Review Comment: `StreamMetadata` is intended to be a value container, but it stores the caller-provided `partitionGroupMetadataList` reference directly. Even though `getPartitionGroupMetadataList()` returns an unmodifiable view, external mutations of the original list would still be reflected. Consider defensively copying (e.g., `List.copyOf(...)`) in the constructor (and optionally validating non-null) to make the object truly immutable and avoid subtle bugs. ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java: ########## @@ -851,11 +857,16 @@ public void testRepairs() { * End of shard cases */ // 1 reached end of shard. - List<PartitionGroupMetadata> partitionGroupMetadataListWithout1 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(), + List<StreamMetadata> streamMetadataListWithout1 = + segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs, Collections.emptyList(), mock(IdealState.class)); - partitionGroupMetadataListWithout1.remove(1); - segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout1; + // Remove partition 1 from the first stream's metadata + StreamMetadata origSm = streamMetadataListWithout1.get(0); + List<PartitionGroupMetadata> filteredPgList = new ArrayList<>(origSm.getPartitionGroupMetadataList()); + filteredPgList.remove(1); + segmentManager._streamMetadataList = Collections.singletonList( + new StreamMetadata(origSm.getStreamConfig(), origSm.getStreamConfigIndex(), + filteredPgList.size(), filteredPgList)); Review Comment: Same as earlier in this file: `partitionCount` is set to `filteredPgList.size()` after removing an entry. For realism, the test should keep `partitionCount` equal to the stream's total partition count returned by `fetchPartitionCount()`, not the size of the filtered metadata list, so it exercises the code paths where `partitionCount > partitionGroupMetadataList.size()`. ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadataFetcher.java: ########## @@ -51,23 +51,34 @@ public PartitionGroupMetadataFetcher(List<StreamConfig> streamConfigs, _pausedTopicIndices = pausedTopicIndices; } + public List<StreamMetadata> getStreamMetadataList() { + return _streamMetadataList; + } Review Comment: `getStreamMetadataList()` returns the internal mutable `_streamMetadataList`. Since this is a new public API, it would be safer to return an unmodifiable view or a defensive copy to prevent callers from accidentally mutating the fetcher's internal state (which can affect subsequent retries/calls). ########## pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManagerTest.java: ########## @@ -296,12 +297,17 @@ public void testCommitSegment() { // Expected } - // committing segment's partitionGroupId no longer in the newPartitionGroupMetadataList - List<PartitionGroupMetadata> partitionGroupMetadataListWithout0 = - segmentManager.getNewPartitionGroupMetadataList(segmentManager._streamConfigs, Collections.emptyList(), + // committing segment's partitionGroupId no longer in the newStreamMetadataList + List<StreamMetadata> streamMetadataListWithout0 = + segmentManager.getNewStreamMetadataList(segmentManager._streamConfigs, Collections.emptyList(), mock(IdealState.class)); - partitionGroupMetadataListWithout0.remove(0); - segmentManager._partitionGroupMetadataList = partitionGroupMetadataListWithout0; + // Remove partition 0 from the first stream's metadata + StreamMetadata originalSm = streamMetadataListWithout0.get(0); + List<PartitionGroupMetadata> filteredList = new ArrayList<>(originalSm.getPartitionGroupMetadataList()); + filteredList.remove(0); + segmentManager._streamMetadataList = Collections.singletonList( + new StreamMetadata(originalSm.getStreamConfig(), originalSm.getStreamConfigIndex(), + filteredList.size(), filteredList)); Review Comment: In this test setup, `StreamMetadata`'s `partitionCount` is set to `filteredList.size()`, but the production contract is that `partitionCount` comes from `fetchPartitionCount()` and can be greater than the size of `partitionGroupMetadataList()` (e.g., when only a subset is assigned / end-of-life partitions are omitted). To better reflect real behavior and avoid masking issues, keep `partitionCount` equal to the original stream partition count even after filtering the metadata list. ```suggestion originalSm.getPartitionCount(), filteredList)); ``` ########## pinot-spi/src/main/java/org/apache/pinot/spi/stream/PartitionGroupMetadata.java: ########## @@ -28,12 +28,20 @@ */ public class PartitionGroupMetadata { + private static final int DEFAULT_SEQUENCE_NUMBER = 0; + private final int _partitionGroupId; private final StreamPartitionMsgOffset _startOffset; + private final int _sequenceNumber; public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset startOffset) { + this(partitionGroupId, startOffset, DEFAULT_SEQUENCE_NUMBER); + } + + public PartitionGroupMetadata(int partitionGroupId, StreamPartitionMsgOffset startOffset, int sequenceNumber) { _partitionGroupId = partitionGroupId; _startOffset = startOffset; + _sequenceNumber = sequenceNumber; } Review Comment: The class-level documentation still describes `PartitionGroupMetadata` as containing only partitionGroupId and startOffset, but a `sequenceNumber` field is now part of the public API. Please update the Javadoc to include the sequence number semantics (e.g., initial sequence vs. subsequent segments) so callers know what `getSequenceNumber()` represents. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
