This is an automated email from the ASF dual-hosted git repository.
kharekartik pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 74e54e053cb Fix pauseless segment going to ERROR on segmentCommitStart
timeout (#17885)
74e54e053cb is described below
commit 74e54e053cb56502d2b5d17276a6bcb2043e2895
Author: 9aman <[email protected]>
AuthorDate: Mon Mar 23 17:27:33 2026 +0530
Fix pauseless segment going to ERROR on segmentCommitStart timeout (#17885)
---
.../core/realtime/PauselessSegmentCompletionFSM.java | 1 +
.../manager/realtime/RealtimeSegmentDataManager.java | 20 +++++++++++++++++++-
.../ServerSegmentCompletionProtocolHandler.java | 13 +++++++++++--
3 files changed, 31 insertions(+), 3 deletions(-)
diff --git
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
index 2e6727b3c17..5ef9892d075 100644
---
a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
+++
b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/realtime/PauselessSegmentCompletionFSM.java
@@ -55,6 +55,7 @@ public class PauselessSegmentCompletionFSM extends
BlockingSegmentCompletionFSM
// this aims to handle the failures during commitSegmentStartMetadata
// we abort the state machine to allow commit protocol to start from the
beginning
// the server would then retry the commit protocol from the start
+ _logger.error("Failed to commit segment metadata to COMMITTING for
segment: {}", _segmentName, e);
return abortAndReturnFailed();
}
_logger.info("{}:Uploading for instance={} offset={}", _state, instanceId,
offset);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index 9d2c90f64f1..c85b005f947 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -108,6 +108,7 @@ import org.apache.pinot.spi.stream.StreamMessageMetadata;
import org.apache.pinot.spi.stream.StreamMetadataProvider;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffset;
import org.apache.pinot.spi.stream.StreamPartitionMsgOffsetFactory;
+import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.CommonConstants.ConsumerState;
import
org.apache.pinot.spi.utils.CommonConstants.Segment.Realtime.CompletionMode;
import org.apache.pinot.spi.utils.IngestionConfigUtils;
@@ -1022,7 +1023,16 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
if (_isOffHeap) {
params.withMemoryUsedBytes(_memoryManager.getTotalAllocatedBytes());
}
- SegmentCompletionProtocol.Response segmentCommitStartResponse =
_protocolHandler.segmentCommitStart(params);
+ // For pauseless ingestion, segmentCommitStart performs heavy ZK work:
update segment ZK metadata
+ // to COMMITTING, create new consuming segment metadata, and update
IdealState. These are the same
+ // operations that happen during segmentCommitEnd in non-pauseless mode.
Use the segment commit
+ // timeout (default 120s) instead of the default request timeout (10s) to
avoid premature timeouts
+ // under ZK pressure, while staying within the controller FSM's time
budget.
+ int commitStartTimeoutMs =
PauselessConsumptionUtils.isPauselessEnabled(_tableConfig)
+ ? (int) SegmentCompletionProtocol.getMaxSegmentCommitTimeMs()
+ :
CommonConstants.Server.SegmentCompletionProtocol.DEFAULT_OTHER_REQUESTS_TIMEOUT;
+ SegmentCompletionProtocol.Response segmentCommitStartResponse =
+ _protocolHandler.segmentCommitStart(params, commitStartTimeoutMs);
if (!segmentCommitStartResponse.getStatus()
.equals(SegmentCompletionProtocol.ControllerResponseStatus.COMMIT_CONTINUE)) {
_segmentLogger.warn("CommitStart failed with response {}",
segmentCommitStartResponse.toJsonString());
@@ -1497,7 +1507,15 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
break;
case CATCHING_UP:
case HOLDING:
+ case COMMITTING:
case INITIAL_CONSUMING:
+ // WARNING: DOWNLOAD mode with pauseless ingestion can trigger a
race condition where the
+ // committing (lead) server downloads the segment instead of
building it locally, causing it
+ // to go missing. See https://github.com/apache/pinot/pull/17885 for
details.
+ // Recovery: RealtimeSegmentValidationManager detects and re-creates
the missing segment.
+ // Not restricted at table config level since this is a rare race
condition and DOWNLOAD mode
+ // is valuable for high-ingestion-rate scenarios (only the lead
server builds the segment,
+ // reducing CPU/memory load on other replicas).
if (_segmentCompletionMode == CompletionMode.DOWNLOAD) {
// Check if download URL has been set by another replica
String downloadUrl = segmentZKMetadata.getDownloadUrl();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
index 211434f005c..9a9a620ebd0 100644
---
a/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
+++
b/pinot-core/src/main/java/org/apache/pinot/server/realtime/ServerSegmentCompletionProtocolHandler.java
@@ -119,13 +119,18 @@ public class ServerSegmentCompletionProtocolHandler {
}
public SegmentCompletionProtocol.Response
segmentCommitStart(SegmentCompletionProtocol.Request.Params params) {
+ return segmentCommitStart(params, DEFAULT_OTHER_REQUESTS_TIMEOUT);
+ }
+
+ public SegmentCompletionProtocol.Response
segmentCommitStart(SegmentCompletionProtocol.Request.Params params,
+ int timeoutMs) {
SegmentCompletionProtocol.SegmentCommitStartRequest request =
new SegmentCompletionProtocol.SegmentCommitStartRequest(params);
String url = createSegmentCompletionUrl(request);
if (url == null) {
return SegmentCompletionProtocol.RESP_NOT_SENT;
}
- return sendRequest(url);
+ return sendRequest(url, timeoutMs);
}
// TODO We need to make this work with trusted certificates if the VIP is
using https.
@@ -231,11 +236,15 @@ public class ServerSegmentCompletionProtocolHandler {
}
private SegmentCompletionProtocol.Response sendRequest(String url) {
+ return sendRequest(url, DEFAULT_OTHER_REQUESTS_TIMEOUT);
+ }
+
+ private SegmentCompletionProtocol.Response sendRequest(String url, int
timeoutMs) {
SegmentCompletionProtocol.Response response;
try {
String responseStr = _fileUploadDownloadClient
.sendSegmentCompletionProtocolRequest(new URI(url),
AuthProviderUtils.toRequestHeaders(_authProvider), null,
- DEFAULT_OTHER_REQUESTS_TIMEOUT).getResponse();
+ timeoutMs).getResponse();
response =
SegmentCompletionProtocol.Response.fromJsonString(responseStr);
LOGGER.info("Controller response {} for {}", response.toJsonString(),
url);
if
(response.getStatus().equals(SegmentCompletionProtocol.ControllerResponseStatus.NOT_LEADER))
{
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]