This is an automated email from the ASF dual-hosted git repository.

kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 74e54e053cb Fix pauseless segment going to ERROR on segmentCommitStart 
timeout (#17885)
74e54e053cb is described below

commit 74e54e053cb56502d2b5d17276a6bcb2043e2895
Author: 9aman <[email protected]>
AuthorDate: Mon Mar 23 17:27:33 2026 +0530

    Fix pauseless segment going to ERROR on segmentCommitStart timeout (#17885)
---
 .../core/realtime/PauselessSegmentCompletionFSM.java |  1 +
 .../manager/realtime/RealtimeSegmentDataManager.java | 20 +++++++++++++++++++-
 .../ServerSegmentCompletionProtocolHandler.java      | 13 +++++++++++--
 3 files changed, 31 insertions(+), 3 deletions(-)

diff --git 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
index 2e6727b3c17..5ef9892d075 100644
--- 
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
+++ 
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
@@ -55,6 +55,7 @@ public class PauselessSegmentCompletionFSM extends 
BlockingSegmentCompletionFSM
       // this aims to handle the failures during commitSegmentStartMetadata
       // we abort the state machine to allow commit protocol to start from the 
beginning
       // the server would then retry the commit protocol from the start
+      _logger.error("Failed to commit segment metadata to COMMITTING for 
segment: {}", _segmentName, e);
       return abortAndReturnFailed();
     }
     _logger.info("{}:Uploading for instance={} offset={}", _state, instanceId, 
offset);
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 9d2c90f64f1..c85b005f947 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -108,6 +108,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
 import org.apache.pinot.spi.stream.StreamMetadataProvider;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
 import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
 import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
 import 
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode;
 import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -1022,7 +1023,16 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
     if (_isOffHeap) {
       params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
     }
-    SegmentCompletionProtocol.Response segmentCommitStartResponse = 
_protocolHandler.segmentCommitStart(params);
+    // For pauseless ingestion, segmentCommitStart performs heavy ZK work: 
update segment ZK metadata
+    // to COMMITTING, create new consuming segment metadata, and update 
IdealState. These are the same
+    // operations that happen during segmentCommitEnd in non-pauseless mode. 
Use the segment commit
+    // timeout (default 120s) instead of the default request timeout (10s) to 
avoid premature timeouts
+    // under ZK pressure, while staying within the controller FSM's time 
budget.
+    int commitStartTimeoutMs = 
PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)
+        ? (int) SegmentCompletionProtocol.getMaxSegmentCommitTimeMs()
+        : 
CommonConstants.Server.SegmentCompletionProtocol.DEFAULT_OTHER_REQUESTS_TIMEOUT;
+    SegmentCompletionProtocol.Response segmentCommitStartResponse =
+        _protocolHandler.segmentCommitStart(params, commitStartTimeoutMs);
     if (!segmentCommitStartResponse.getStatus()
         
.equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
       _segmentLogger.warn("CommitStart failed  with response {}", 
segmentCommitStartResponse.toJsonString());
@@ -1497,7 +1507,15 @@ public class RealtimeSegmentDataManager extends 
SegmentDataManager {
           break;
         case CATCHING_UP:
         case HOLDING:
+        case COMMITTING:
         case INITIAL_CONSUMING:
+          // WARNING: DOWNLOAD mode with pauseless ingestion can trigger a 
race condition where the
+          // committing (lead) server downloads the segment instead of 
building it locally, causing it
+          // to go missing. See https://github.com/apache/pinot/pull/17885 for 
details.
+          // Recovery: RealtimeSegmentValidationManager detects and re-creates 
the missing segment.
+          // Not restricted at table config level since this is a rare race 
condition and DOWNLOAD mode
+          // is valuable for high-ingestion-rate scenarios (only the lead 
server builds the segment,
+          // reducing CPU/memory load on other replicas).
           if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
             // Check if download URL has been set by another replica
             String downloadUrl = segmentZKMetadata.getDownloadUrl();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index 211434f005c..9a9a620ebd0 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -119,13 +119,18 @@ public class ServerSegmentCompletionProtocolHandler {
   }
 
   public SegmentCompletionProtocol.Response 
segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
+    return segmentCommitStart(params, DEFAULT_OTHER_REQUESTS_TIMEOUT);
+  }
+
+  public SegmentCompletionProtocol.Response 
segmentCommitStart(SegmentCompletionProtocol.Request.Params params,
+      int timeoutMs) {
     SegmentCompletionProtocol.SegmentCommitStartRequest request =
         new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
     String url = createSegmentCompletionUrl(request);
     if (url == null) {
       return SegmentCompletionProtocol.RESP_NOT_SENT;
     }
-    return sendRequest(url);
+    return sendRequest(url, timeoutMs);
   }
 
   // TODO We need to make this work with trusted certificates if the VIP is 
using https.
@@ -231,11 +236,15 @@ public class ServerSegmentCompletionProtocolHandler {
   }
 
   private SegmentCompletionProtocol.Response sendRequest(String url) {
+    return sendRequest(url, DEFAULT_OTHER_REQUESTS_TIMEOUT);
+  }
+
+  private SegmentCompletionProtocol.Response sendRequest(String url, int 
timeoutMs) {
     SegmentCompletionProtocol.Response response;
     try {
       String responseStr = _fileUploadDownloadClient
           .sendSegmentCompletionProtocolRequest(new URI(url), 
AuthProviderUtils.toRequestHeaders(_authProvider), null,
-              DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse();
+              timeoutMs).getResponse();
       response = 
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
       LOGGER.info("Controller response {} for {}", response.toJsonString(), 
url);
       if 
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
 {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to