Jackie-Jiang commented on code in PR #14828:
URL: https://github.com/apache/pinot/pull/14828#discussion_r1919494621
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2249,9 +2249,31 @@ public boolean addNewForceCommitJob(String
tableNameWithType, String jobId, long
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
+
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
+ JsonUtils.objectToString(consumingSegmentsCommitted));
Review Comment:
(minor) This shouldn't be needed. Adding it will actually add overhead for
one extra parsing
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String
segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName,
URIUtils.encode(segmentName));
}
+
+ public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType,
Set<String> segmentsToCheck) {
+ Set<String> segmentsYetToBeCommitted = new HashSet<>();
+ for (String segmentName: segmentsToCheck) {
+ SegmentZKMetadata segmentZKMetadata =
+ _helixResourceManager.getSegmentZKMetadata(tableNameWithType,
segmentName);
+ if (segmentZKMetadata == null) {
+ continue;
+ }
+ if (!segmentZKMetadata.getStatus().isCompleted()) {
Review Comment:
We cannot use `isCompleted()` here. We should explicitly check for it to be
`DONE`
@9aman @KKcorps Currently `COMMITTING` is also count as completed, is this
expected?
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String
segmentName, String segmentLo
URI createSegmentPath(String rawTableName, String segmentName) {
return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName,
URIUtils.encode(segmentName));
}
+
+ public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType,
Set<String> segmentsToCheck) {
+ Set<String> segmentsYetToBeCommitted = new HashSet<>();
+ for (String segmentName: segmentsToCheck) {
+ SegmentZKMetadata segmentZKMetadata =
+ _helixResourceManager.getSegmentZKMetadata(tableNameWithType,
segmentName);
+ if (segmentZKMetadata == null) {
+ continue;
Review Comment:
(minor) Add some comment about this behavior. We are counting deleted
segment as not need to be committed
##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2249,9 +2249,31 @@ public boolean addNewForceCommitJob(String
tableNameWithType, String jobId, long
jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS,
Long.toString(jobSubmissionTimeMs));
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
JsonUtils.objectToString(consumingSegmentsCommitted));
+
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
+ JsonUtils.objectToString(consumingSegmentsCommitted));
return addControllerJobToZK(jobId, jobMetadata,
ControllerJobType.FORCE_COMMIT);
}
+ public void updateForceCommitJobMetadata(String forceCommitJobId,
Set<String> segmentsYetToBeCommitted,
+ Map<String, String> controllerJobZKMetadata) {
+ addControllerJobToZK(forceCommitJobId,
+ controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT,
prevJobMetadata -> {
+ String existingSegmentsYetToBeCommittedString =
Review Comment:
Please add some comments describing why we want to perform this check
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]