This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 5fd3619843 Fix segment completion FSM on uploaded segment (#15062)
5fd3619843 is described below

commit 5fd361984300f642098448d776e6f5b0dde81d56
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Feb 14 13:57:14 2025 -0800

    Fix segment completion FSM on uploaded segment (#15062)
---
 .../helix/core/realtime/BlockingSegmentCompletionFSM.java     | 10 ++++++----
 .../helix/core/realtime/PauselessSegmentCompletionFSM.java    | 11 +----------
 2 files changed, 7 insertions(+), 14 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
index fc48095c85..75556034ff 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
@@ -18,6 +18,7 @@
  */
 package org.apache.pinot.controller.helix.core.realtime;
 
+import com.google.common.base.Preconditions;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -143,12 +144,13 @@ public class BlockingSegmentCompletionFSM implements 
SegmentCompletionFSM {
     _maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
     _controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
 
-    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.DONE) {
+    if (segmentMetadata.getStatus() != 
CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+      _state = BlockingSegmentCompletionFSMState.COMMITTED;
       StreamPartitionMsgOffsetFactory factory =
           
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
-      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
-      _state = BlockingSegmentCompletionFSMState.COMMITTED;
-      _winningOffset = endOffset;
+      String endOffset = segmentMetadata.getEndOffset();
+      Preconditions.checkState(endOffset != null, "Failed to find end offset 
for segment: %s", segmentName);
+      _winningOffset = factory.create(endOffset);
       _winner = "UNKNOWN";
     }
   }
diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
index f1ca0ece26..7e439bc480 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
@@ -23,24 +23,15 @@ import 
org.apache.pinot.common.protocols.SegmentCompletionProtocol;
 import org.apache.pinot.common.utils.LLCSegmentName;
 import 
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
-import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.builder.TableNameBuilder;
 
 
 public class PauselessSegmentCompletionFSM extends 
BlockingSegmentCompletionFSM {
+
   public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager 
segmentManager,
       SegmentCompletionManager segmentCompletionManager, LLCSegmentName 
segmentName,
       SegmentZKMetadata segmentMetadata) {
     super(segmentManager, segmentCompletionManager, segmentName, 
segmentMetadata);
-    if (segmentMetadata.getStatus() == 
CommonConstants.Segment.Realtime.Status.COMMITTING) {
-      StreamPartitionMsgOffsetFactory factory =
-          
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
-      StreamPartitionMsgOffset endOffset = 
factory.create(segmentMetadata.getEndOffset());
-      _state = BlockingSegmentCompletionFSMState.COMMITTED;
-      _winningOffset = endOffset;
-      _winner = "UNKNOWN";
-    }
   }
 
   @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to