sajjad-moradi commented on code in PR #15177:
URL: https://github.com/apache/pinot/pull/15177#discussion_r1980529478
##########
pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/mergerollup/MergeRollupTaskGenerator.java:
##########
@@ -565,19 +566,38 @@ static List<SegmentZKMetadata>
filterSegmentsBasedOnStatus(TableType tableType,
// if new records are consumed later, the MergeRollupTask may have
already moved watermarks forward, and may
// not be able to merge those lately-created segments -- we assume
that users will have a way to backfill those
// records correctly.
- long earliestStartTimeMsOfInProgressSegments = Long.MAX_VALUE;
+ Map<Integer, LLCSegmentName> latestCompletedSegmentInEachPartition = new
HashMap<>();
+ HashSet<String> filteredSegmentNames = new HashSet<>();
for (SegmentZKMetadata segmentZKMetadata : allSegments) {
- if (!segmentZKMetadata.getStatus().isCompleted()
- && segmentZKMetadata.getTotalDocs() > 0
- && segmentZKMetadata.getStartTimeMs() <
earliestStartTimeMsOfInProgressSegments) {
- earliestStartTimeMsOfInProgressSegments =
segmentZKMetadata.getStartTimeMs();
+ if (segmentZKMetadata.getStatus().isCompleted()) {
+ // completed segments
+ if (LLCSegmentName.isLLCSegment(segmentZKMetadata.getSegmentName()))
{
+ // realtime segments
+ LLCSegmentName llcSegmentName = new
LLCSegmentName(segmentZKMetadata.getSegmentName());
+ int partitionId = llcSegmentName.getPartitionGroupId();
+ if
(!latestCompletedSegmentInEachPartition.containsKey(partitionId)) {
+ // current segment is the latest found
+
latestCompletedSegmentInEachPartition.put(llcSegmentName.getPartitionGroupId(),
llcSegmentName);
+ } else {
+ if (llcSegmentName.getSequenceNumber() >
+
latestCompletedSegmentInEachPartition.get(partitionId).getSequenceNumber()) {
+ // current segment is the latest found
+
filteredSegmentNames.add(latestCompletedSegmentInEachPartition.get(partitionId).getSegmentName());
+ latestCompletedSegmentInEachPartition.put(partitionId,
llcSegmentName);
+ } else {
+ // current segment is not the latest
+ filteredSegmentNames.add(llcSegmentName.getSegmentName());
+ }
+ }
+ } else {
+ // other segments: merged segments, uploaded segments, or ingested
offline segments
+ filteredSegmentNames.add(segmentZKMetadata.getSegmentName());
+ }
}
}
- final long finalEarliestStartTimeMsOfInProgressSegments =
earliestStartTimeMsOfInProgressSegments;
return allSegments.stream()
- .filter(segmentZKMetadata ->
segmentZKMetadata.getStatus().isCompleted()
- && segmentZKMetadata.getStartTimeMs() <
finalEarliestStartTimeMsOfInProgressSegments)
- .collect(Collectors.toList());
+ .filter(a->filteredSegmentNames.contains(a.getSegmentName()))
+ .collect(Collectors.toList());
Review Comment:
We can make this code simpler for read/review by only calculating
`partitionIdToLatestCompletedSegment`:
```java
if (segmentZKMetadata.getStatus().isCompleted()) {
String segmentName = segmentZKMetadata.getSegmentName();
if (LLCSegmentName.isLLCSegment(segmentName)) {
LLCSegmentName llcSegmentName = new LLCSegmentName(segmentName);
partitionIdToLatestCompletedSegment.compute(llcSegmentName.getPartitionGroupId(),
(partId, latestSegment) -> {
if (latestSegment == null) {
return llcSegmentName;
} else {
return latestSegment.getSequenceNumber() >
llcSegmentName.getSequenceNumber() ? latestSegment
: llcSegmentName;
}
});
}
}
```
Then at the end, we when we go over elements of `allSegments` and we filter
out latest completed segments.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]