This is an automated email from the ASF dual-hosted git repository.
Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 21c88f716f3 Add a latch to allow ingestion pause during startup
(#18492)
21c88f716f3 is described below
commit 21c88f716f376bb7a0238e0feeeff12b088f55f7
Author: Jhow <[email protected]>
AuthorDate: Wed May 13 22:00:23 2026 -0700
Add a latch to allow ingestion pause during startup (#18492)
---
.../pinot/core/data/manager/InstanceDataManager.java | 8 ++++++++
.../provider/DefaultTableDataManagerProvider.java | 7 ++++---
.../manager/provider/TableDataManagerProvider.java | 3 ++-
.../manager/realtime/RealtimeTableDataManager.java | 19 +++++++++++++++++--
.../FailureInjectingTableDataManagerProvider.java | 1 +
.../pinot/server/starter/helix/BaseServerStarter.java | 11 ++++++++++-
.../starter/helix/HelixInstanceDataManager.java | 10 ++++++++--
7 files changed, 50 insertions(+), 9 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index c1aed30df36..2817382fb06 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -192,6 +192,14 @@ public interface InstanceDataManager {
*/
void forceCommit(String tableNameWithType, Set<String> segmentNames);
+ /**
+ * Installs a supplier that gates consuming-segment ingestion. While the
supplier returns {@code false}, newly created
+ * realtime consumers wait at the entry of their consumer thread before
pulling any data. The supplier is consulted
+ * per consumer; once it returns {@code true} for a given consumer, that
consumer should proceed and should
+ * not be gated again. Implementations default to a no-op so subclasses opt
in explicitly.
+ */
+ void setSupplierOfIsServerReadyToConsumeData(BooleanSupplier
isServerReadyToConsumeData);
+
/**
* Enables the installation of a method to determine if a server is ready to
server queries.
*
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 3246e4bd743..57cedd30812 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -75,8 +75,8 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService
segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
- BooleanSupplier isServerReadyToServeQueries, boolean
enableAsyncSegmentRefresh,
- ServerReloadJobStatusCache reloadJobStatusCache) {
+ BooleanSupplier isServerReadyToConsumeData, BooleanSupplier
isServerReadyToServeQueries,
+ boolean enableAsyncSegmentRefresh, ServerReloadJobStatusCache
reloadJobStatusCache) {
TableDataManager tableDataManager;
switch (tableConfig.getTableType()) {
case OFFLINE:
@@ -94,7 +94,8 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
+ "configured the segmentstore uri. Configure the server
config %s",
StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
}
- tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
+ tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToConsumeData,
+ isServerReadyToServeQueries);
break;
default:
throw new IllegalStateException();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index aab64b00ae3..b0e4d11e572 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -54,6 +54,7 @@ public interface TableDataManagerProvider {
ExecutorService segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
+ BooleanSupplier isServerReadyToConsumeData,
BooleanSupplier isServerReadyToServeQueries,
boolean enableAsyncSegmentRefresh,
ServerReloadJobStatusCache reloadJobStatusCache);
@@ -64,6 +65,6 @@ public interface TableDataManagerProvider {
@VisibleForTesting
default TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema) {
return getTableDataManager(tableConfig, schema, new
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
- null, null, () -> true, false, new
ServerReloadJobStatusCache("testInstance"));
+ null, null, () -> true, () -> true, false, new
ServerReloadJobStatusCache("testInstance"));
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d3849e49e1b..15ec71fe50f 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -137,6 +137,7 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
protected Cache<String, StreamMetadataProvider> _streamMetadataProviderCache;
+ private final BooleanSupplier _isServerReadyToConsumeData;
private final BooleanSupplier _isServerReadyToServeQueries;
// Object to track ingestion delay for all partitions
@@ -151,7 +152,18 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
BooleanSupplier isServerReadyToServeQueries) {
+ this(segmentBuildSemaphore, () -> true, isServerReadyToServeQueries);
+ }
+
+ /**
+ * @param isServerReadyToConsumeData returns {@code false} when
consuming-segment ingestion should be held off
+ * (e.g. while the server is still draining startup-time work). Each
consuming segment checks this gate at the
+ * entry of its consumer thread; once the gate clears, it is not
consulted again for that segment.
+ */
+ public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
BooleanSupplier isServerReadyToConsumeData,
+ BooleanSupplier isServerReadyToServeQueries) {
_segmentBuildSemaphore = segmentBuildSemaphore;
+ _isServerReadyToConsumeData = isServerReadyToConsumeData;
_isServerReadyToServeQueries = isServerReadyToServeQueries;
}
@@ -221,8 +233,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
_streamMetadataProviderCache = getStreamMetadataProviderCache();
// For dedup and partial-upsert, need to wait for all segments loaded
before starting consuming data
+ BooleanSupplier readyForDedupOrPartialUpsert;
if (isDedupEnabled() || isPartialUpsertEnabled()) {
- _isTableReadyToConsumeData = new BooleanSupplier() {
+ readyForDedupOrPartialUpsert = new BooleanSupplier() {
volatile boolean _allSegmentsLoaded;
long _lastCheckTimeMs;
@@ -247,8 +260,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
};
} else {
- _isTableReadyToConsumeData = () -> true;
+ readyForDedupOrPartialUpsert = () -> true;
}
+ _isTableReadyToConsumeData = () ->
readyForDedupOrPartialUpsert.getAsBoolean()
+ && _isServerReadyToConsumeData.getAsBoolean();
}
@VisibleForTesting
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index bb649143e30..d73bcb38260 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -82,6 +82,7 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
ExecutorService segmentReloadRefreshExecutor,
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
+ BooleanSupplier isServerReadyToConsumeData,
BooleanSupplier isServerReadyToServeQueries,
boolean enableAsyncSegmentRefresh,
ServerReloadJobStatusCache reloadJobStatusCache) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 638553f05d1..48c8188f92f 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -767,7 +767,8 @@ public abstract class BaseServerStarter implements
ServiceStartable {
_threadAccountant, sendStatsPredicate,
keepPipelineBreakerStatsPredicate, _reloadJobStatusCache);
InstanceDataManager instanceDataManager =
_serverInstance.getInstanceDataManager();
- instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() ->
_isServerReadyToServeQueries);
+
instanceDataManager.setSupplierOfIsServerReadyToConsumeData(this::isServerReadyToConsumeData);
+
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(this::isServerReadyToServeQueries);
// Enable Server level realtime ingestion rate limier
RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf,
_serverMetrics);
@@ -946,6 +947,14 @@ public abstract class BaseServerStarter implements
ServiceStartable {
NettyInspector.registerMetrics(_serverMetrics);
}
+ protected boolean isServerReadyToConsumeData() {
+ return true;
+ }
+
+ protected boolean isServerReadyToServeQueries() {
+ return _isServerReadyToServeQueries;
+ }
+
protected SegmentOperationsThrottler
createMultiColumnIndexPreprocessThrottler() {
int maxConcurrency = Integer.parseInt(
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_MULTICOL_TEXT_INDEX_PREPROCESS_PARALLELISM,
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9d7464cca97..ebe224f73d9 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -105,6 +105,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private HelixManager _helixManager;
private ZkHelixPropertyStore<ZNRecord> _propertyStore;
private SegmentUploader _segmentUploader;
+ private BooleanSupplier _isServerReadyToConsumeData = () -> false;
private BooleanSupplier _isServerReadyToServeQueries = () -> false;
// Fixed size LRU cache for storing last N errors on the instance.
@@ -122,6 +123,11 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
@Nullable
private ExecutorService _segmentPreloadExecutor;
+ @Override
+ public void setSupplierOfIsServerReadyToConsumeData(BooleanSupplier
isServerReadyToConsumeData) {
+ _isServerReadyToConsumeData = isServerReadyToConsumeData;
+ }
+
@Override
public void setSupplierOfIsServerReadyToServeQueries(BooleanSupplier
isServingQueries) {
_isServerReadyToServeQueries = isServingQueries;
@@ -342,8 +348,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
TableDataManager tableDataManager =
_tableDataManagerProvider.getTableDataManager(tableConfig, schema,
_segmentReloadSemaphore,
- _segmentReloadRefreshExecutor, _segmentPreloadExecutor,
_errorCache, _isServerReadyToServeQueries,
- _enableAsyncSegmentRefresh, _reloadJobStatusCache);
+ _segmentReloadRefreshExecutor, _segmentPreloadExecutor,
_errorCache, _isServerReadyToConsumeData,
+ _isServerReadyToServeQueries, _enableAsyncSegmentRefresh,
_reloadJobStatusCache);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]