sajjad-moradi commented on code in PR #15177:
URL: https://github.com/apache/pinot/pull/15177#discussion_r1980537950
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -565,19 +566,38 @@ static List<SegmentZKMetadata>
filterSegmentsBasedOnStatus(TableType tableType,
// if new records are consumed later, the MergeRollupTask may have
already moved watermarks forward, and may
// not be able to merge those lately-created segments -- we assume
that users will have a way to backfill those
// records correctly.
- long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+ Map<Integer, LLCSegmentName> latestCompletedSegmentInEachPartition = new
HashMap<>();
+ HashSet<String> filteredSegmentNames = new HashSet<>();
for (SegmentZKMetadata segmentZKMetadata : allSegments) {
- if (!segmentZKMetadata.getStatus().isCompleted()
- && segmentZKMetadata.getTotalDocs() > 0
- && segmentZKMetadata.getStartTimeMs() <
earliestStartTimeMsOfInProgressSegments) {
- earliestStartTimeMsOfInProgressSegments =
segmentZKMetadata.getStartTimeMs();
+ if (segmentZKMetadata.getStatus().isCompleted()) {
+ // completed segments
+ if (LLCSegmentName.isLLCSegment(segmentZKMetadata.getSegmentName()))
{
+ // realtime segments
+ LLCSegmentName llcSegmentName = new
LLCSegmentName(segmentZKMetadata.getSegmentName());
+ int partitionId = llcSegmentName.getPartitionGroupId();
+ if
(!latestCompletedSegmentInEachPartition.containsKey(partitionId)) {
+ // current segment is the latest found
+
latestCompletedSegmentInEachPartition.put(llcSegmentName.getPartitionGroupId(),
llcSegmentName);
+ } else {
+ if (llcSegmentName.getSequenceNumber() >
+
latestCompletedSegmentInEachPartition.get(partitionId).getSequenceNumber()) {
+ // current segment is the latest found
+
filteredSegmentNames.add(latestCompletedSegmentInEachPartition.get(partitionId).getSegmentName());
+ latestCompletedSegmentInEachPartition.put(partitionId,
llcSegmentName);
+ } else {
+ // current segment is not the latest
+ filteredSegmentNames.add(llcSegmentName.getSegmentName());
+ }
+ }
+ } else {
+ // other segments: merged segments, uploaded segments, or ingested
offline segments
+ filteredSegmentNames.add(segmentZKMetadata.getSegmentName());
+ }
}
}
- final long finalEarliestStartTimeMsOfInProgressSegments =
earliestStartTimeMsOfInProgressSegments;
return allSegments.stream()
- .filter(segmentZKMetadata ->
segmentZKMetadata.getStatus().isCompleted()
- && segmentZKMetadata.getStartTimeMs() <
finalEarliestStartTimeMsOfInProgressSegments)
- .collect(Collectors.toList());
+ .filter(a->filteredSegmentNames.contains(a.getSegmentName()))
+ .collect(Collectors.toList());
Review Comment:
Also, once the PR https://github.com/apache/pinot/pull/15173 is merged, we
need to refactor a bit before this function is called, so that allSegments only
contains completed segments. Then we don't need to event check for
`.isCompleted()` as it's already taken care of.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]