Jackie-Jiang commented on code in PR #8584:
URL: https://github.com/apache/pinot/pull/8584#discussion_r864295648
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/assignment/segment/RealtimeSegmentAssignment.java:
##########
@@ -104,18 +104,15 @@ public void init(HelixManager helixManager, TableConfig
tableConfig) {
@Override
public List<String> assignSegment(String segmentName, Map<String,
Map<String, String>> currentAssignment,
Map<InstancePartitionsType, InstancePartitions> instancePartitionsMap) {
- InstancePartitions instancePartitions =
instancePartitionsMap.get(InstancePartitionsType.CONSUMING);
- Preconditions.checkState(instancePartitions != null, "Failed to find
CONSUMING instance partitions for table: %s",
- _realtimeTableName);
+ Preconditions.checkState(instancePartitionsMap.size() == 1, "One instance
partition type should be provided");
+ InstancePartitions instancePartitions =
instancePartitionsMap.entrySet().iterator().next().getValue();
Review Comment:
We should check the `InstancePartitionsType` of the entry to decide how to
assign the segment. The current logic is for `CONSUMING` type only, and we
should add one for `COMPLETED` type, which should be similar to
`OfflineSegmentAssignment.assignSegment()`
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -1995,23 +1992,23 @@ public SegmentZKMetadata
constructZkMetadataForNewSegment(String tableNameWithTy
public void assignTableSegment(String tableNameWithType, String segmentName)
{
String segmentZKMetadataPath =
ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType,
segmentName);
- InstancePartitionsType instancePartitionsType;
- if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
- // In an upsert enabled LLC realtime table, all segments of the same
partition are collocated on the same server
- // -- consuming or completed. So it is fine to use CONSUMING as the
InstancePartitionsType.
- // TODO When upload segments is open to all realtime tables, we should
change the type to COMPLETED instead.
- // In addition, RealtimeSegmentAssignment.assignSegment(..) method
should be updated so that the method does not
- // assign segments to CONSUMING instance partition only.
- instancePartitionsType = InstancePartitionsType.CONSUMING;
- } else {
- instancePartitionsType = InstancePartitionsType.OFFLINE;
- }
// Assign instances for the segment and add it into IdealState
try {
TableConfig tableConfig = getTableConfig(tableNameWithType);
Preconditions.checkState(tableConfig != null, "Failed to find table
config for table: " + tableNameWithType);
- SegmentAssignment segmentAssignment =
SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig);
+ InstancePartitionsType instancePartitionsType;
+ if (TableNameBuilder.isRealtimeTableResource(tableNameWithType)) {
Review Comment:
We should use `COMPLETED` type only if the `COMPLETED` instance partitions
exists or the tag override is configured. If segment relocation is not
configured, we should follow the same assignment as the `CONSUMING` segments.
Let's keep the comments for the upsert case. For upsert segments, we should
always use the `CONSUMING` type.
##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java:
##########
@@ -359,6 +359,16 @@ public void addSegment(String segmentName, TableConfig
tableConfig, IndexLoading
_serverMetrics.addValueToTableGauge(_tableNameWithType,
ServerGauge.SEGMENT_COUNT, 1L);
}
+ /*
+ * This method is implemented to allow refreshing the segments in realtime
tables.
+ */
+ @Override
+ public void addSegment(File indexDir, IndexLoadingConfig indexLoadingConfig)
Review Comment:
Move this implementation to the `BaseTableDataManager`. It is common for
both offline and realtime table
##########
pinot-common/src/main/java/org/apache/pinot/common/utils/SegmentUtils.java:
##########
@@ -35,32 +35,27 @@ private SegmentUtils() {
// Returns the partition id of a realtime segment based segment name and
segment metadata info retrieved via Helix.
// Important: The method is costly because it may read data from zookeeper.
Do not use it in any query execution
// path.
- public static int getRealtimeSegmentPartitionId(String segmentName, String
realtimeTableName,
- HelixManager helixManager, String partitionColumn) {
+ public static @Nullable
+ Integer getRealtimeSegmentPartitionId(String segmentName, String
realtimeTableName, HelixManager helixManager,
Review Comment:
(convention)
```suggestion
@Nullable
public static Integer getRealtimeSegmentPartitionId(String segmentName,
String realtimeTableName, HelixManager helixManager,
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]