Jackie-Jiang commented on code in PR #14828:
URL: https://github.com/apache/pinot/pull/14828#discussion_r1919494621


##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2249,9 +2249,31 @@ public boolean addNewForceCommitJob(String 
tableNameWithType, String jobId, long
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
         JsonUtils.objectToString(consumingSegmentsCommitted));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
+        JsonUtils.objectToString(consumingSegmentsCommitted));

Review Comment:
   (minor) This shouldn't be needed. Adding it will actually add overhead for 
one extra parsing



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String 
segmentName, String segmentLo
   URI createSegmentPath(String rawTableName, String segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
+
+  public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, 
Set<String> segmentsToCheck) {
+    Set<String> segmentsYetToBeCommitted = new HashSet<>();
+    for (String segmentName: segmentsToCheck) {
+      SegmentZKMetadata segmentZKMetadata =
+          _helixResourceManager.getSegmentZKMetadata(tableNameWithType, 
segmentName);
+      if (segmentZKMetadata == null) {
+        continue;
+      }
+      if (!segmentZKMetadata.getStatus().isCompleted()) {

Review Comment:
   We cannot use `isCompleted()` here. We should explicitly check for it to be 
`DONE`
   
   @9aman @KKcorps Currently `COMMITTING` is also count as completed, is this 
expected?



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PinotLLCRealtimeSegmentManager.java:
##########
@@ -1889,4 +1889,19 @@ String moveSegmentFile(String rawTableName, String 
segmentName, String segmentLo
   URI createSegmentPath(String rawTableName, String segmentName) {
     return URIUtils.getUri(_controllerConf.getDataDir(), rawTableName, 
URIUtils.encode(segmentName));
   }
+
+  public Set<String> getSegmentsYetToBeCommitted(String tableNameWithType, 
Set<String> segmentsToCheck) {
+    Set<String> segmentsYetToBeCommitted = new HashSet<>();
+    for (String segmentName: segmentsToCheck) {
+      SegmentZKMetadata segmentZKMetadata =
+          _helixResourceManager.getSegmentZKMetadata(tableNameWithType, 
segmentName);
+      if (segmentZKMetadata == null) {
+        continue;

Review Comment:
   (minor) Add some comment about this behavior. We are counting deleted 
segment as not need to be committed



##########
pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java:
##########
@@ -2249,9 +2249,31 @@ public boolean addNewForceCommitJob(String 
tableNameWithType, String jobId, long
     jobMetadata.put(CommonConstants.ControllerJob.SUBMISSION_TIME_MS, 
Long.toString(jobSubmissionTimeMs));
     
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_FORCE_COMMITTED_LIST,
         JsonUtils.objectToString(consumingSegmentsCommitted));
+    
jobMetadata.put(CommonConstants.ControllerJob.CONSUMING_SEGMENTS_YET_TO_BE_COMMITTED_LIST,
+        JsonUtils.objectToString(consumingSegmentsCommitted));
     return addControllerJobToZK(jobId, jobMetadata, 
ControllerJobType.FORCE_COMMIT);
   }
 
+  public void updateForceCommitJobMetadata(String forceCommitJobId, 
Set<String> segmentsYetToBeCommitted,
+      Map<String, String> controllerJobZKMetadata) {
+    addControllerJobToZK(forceCommitJobId,
+        controllerJobZKMetadata, ControllerJobType.FORCE_COMMIT, 
prevJobMetadata -> {
+          String existingSegmentsYetToBeCommittedString =

Review Comment:
   Please add some comments describing why we want to perform this check



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to