This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 5fd3619843 Fix segment completion FSM on uploaded segment (#15062)
5fd3619843 is described below
commit 5fd361984300f642098448d776e6f5b0dde81d56
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Fri Feb 14 13:57:14 2025 -0800
Fix segment completion FSM on uploaded segment (#15062)
---
.../helix/core/realtime/BlockingSegmentCompletionFSM.java | 10 ++++++----
.../helix/core/realtime/PauselessSegmentCompletionFSM.java | 11 +----------
2 files changed, 7 insertions(+), 14 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
index fc48095c85..75556034ff 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/BlockingSegmentCompletionFSM.java
@@ -18,6 +18,7 @@
*/
package org.apache.pinot.controller.helix.core.realtime;
+import com.google.common.base.Preconditions;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -143,12 +144,13 @@ public class BlockingSegmentCompletionFSM implements
SegmentCompletionFSM {
_maxTimeAllowedToCommitMs = _startTimeMs + _initialCommitTimeMs;
_controllerVipUrl = segmentCompletionManager.getControllerVipUrl();
- if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.DONE) {
+ if (segmentMetadata.getStatus() !=
CommonConstants.Segment.Realtime.Status.IN_PROGRESS) {
+ _state = BlockingSegmentCompletionFSMState.COMMITTED;
StreamPartitionMsgOffsetFactory factory =
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
- StreamPartitionMsgOffset endOffset =
factory.create(segmentMetadata.getEndOffset());
- _state = BlockingSegmentCompletionFSMState.COMMITTED;
- _winningOffset = endOffset;
+ String endOffset = segmentMetadata.getEndOffset();
+ Preconditions.checkState(endOffset != null, "Failed to find end offset
for segment: %s", segmentName);
+ _winningOffset = factory.create(endOffset);
_winner = "UNKNOWN";
}
}
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
index f1ca0ece26..7e439bc480 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
@@ -23,24 +23,15 @@ import
org.apache.pinot.common.protocols.SegmentCompletionProtocol;
import org.apache.pinot.common.utils.LLCSegmentName;
import
org.apache.pinot.controller.helix.core.realtime.segment.CommittingSegmentDescriptor;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
-import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
public class PauselessSegmentCompletionFSM extends
BlockingSegmentCompletionFSM {
+
public PauselessSegmentCompletionFSM(PinotLLCRealtimeSegmentManager
segmentManager,
SegmentCompletionManager segmentCompletionManager, LLCSegmentName
segmentName,
SegmentZKMetadata segmentMetadata) {
super(segmentManager, segmentCompletionManager, segmentName,
segmentMetadata);
- if (segmentMetadata.getStatus() ==
CommonConstants.Segment.Realtime.Status.COMMITTING) {
- StreamPartitionMsgOffsetFactory factory =
-
_segmentCompletionManager.getStreamPartitionMsgOffsetFactory(_segmentName);
- StreamPartitionMsgOffset endOffset =
factory.create(segmentMetadata.getEndOffset());
- _state = BlockingSegmentCompletionFSMState.COMMITTED;
- _winningOffset = endOffset;
- _winner = "UNKNOWN";
- }
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]