npawar commented on a change in pull request #7066:
URL: https://github.com/apache/pinot/pull/7066#discussion_r682955117
##########
File path:
pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/realtime/segment/FlushThresholdUpdaterTest.java
##########
@@ -367,7 +371,7 @@ public void testSegmentSizeBasedUpdaterWithModifications() {
mockAutotuneStreamConfig(flushSegmentDesiredSizeBytes,
flushThresholdTimeMillis, flushAutotuneInitialRows);
committingSegmentZKMetadata = getCommittingSegmentZKMetadata(creationTime,
sizeThreshold, numRowsConsumed);
flushThresholdUpdater.updateFlushThreshold(streamConfig,
newSegmentZKMetadata, committingSegmentDescriptor,
- committingSegmentZKMetadata, 1);
+ committingSegmentZKMetadata, 1, Collections.emptyList());
Review comment:
is it possible to add at least 1 test that exercises this 1) where it
picks 0 2) it picks another partition id which is lowest
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java
##########
@@ -1184,7 +1187,8 @@ private String setupNewPartitionGroup(TableConfig
tableConfig, PartitionLevelStr
CommittingSegmentDescriptor committingSegmentDescriptor = new
CommittingSegmentDescriptor(null, startOffset, 0);
createNewSegmentZKMetadata(tableConfig, streamConfig, newLLCSegmentName,
creationTimeMs,
- committingSegmentDescriptor, null, instancePartitions,
numPartitionGroups, numReplicas);
+ committingSegmentDescriptor, null, instancePartitions,
numPartitionGroups, numReplicas,
+ Collections.singletonList(partitionGroupMetadata));
Review comment:
this should pass the whole list right? if we send just the one, every
partition will update threshold based on its own partition, and this will not
be the same as using only 0th/lowest available partition.
##########
File path:
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/segment/SegmentSizeBasedFlushThresholdUpdater.java
##########
@@ -102,8 +106,14 @@ public synchronized void
updateFlushThreshold(PartitionLevelStreamConfig streamC
// less same characteristics at any one point in time).
// However, when we start a new table or change controller mastership, we
can have any partition completing first.
// It is best to learn the ratio as quickly as we can, so we allow any
partition to supply the value.
- // FIXME: The stream may not have partition "0"
- if (new LLCSegmentName(newSegmentName).getPartitionGroupId() == 0 ||
_latestSegmentRowsToSizeRatio == 0) {
+
+ // Partition group id 0 might not be available always. We take the
smallest available partition id in that case to update the threshold
+ int smallestAvailablePartitionGroupId =
+
partitionGroupMetadataList.stream().min(Comparator.comparingInt(PartitionGroupMetadata::getPartitionGroupId))
+ .map(PartitionGroupMetadata::getPartitionGroupId).orElseGet(() ->
0);
Review comment:
seems like the `orElseGet(()->0)` is added only for the test purposes?
In an actual setup, you would never reach that path? In that case, can we just
send the right input in the test, instead of doing this?
We still might have to guard against the case where all shards expired (can
happen in kinesis). But then, we can set some special number here (-1) instead.
wdyt?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]