This is an automated email from the ASF dual-hosted git repository.

Jackie-Jiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git


The following commit(s) were added to refs/heads/master by this push:
     new 21c88f716f3 Add a latch to allow ingestion pause during startup 
(#18492)
21c88f716f3 is described below

commit 21c88f716f376bb7a0238e0feeeff12b088f55f7
Author: Jhow <[email protected]>
AuthorDate: Wed May 13 22:00:23 2026 -0700

    Add a latch to allow ingestion pause during startup (#18492)
---
 .../pinot/core/data/manager/InstanceDataManager.java  |  8 ++++++++
 .../provider/DefaultTableDataManagerProvider.java     |  7 ++++---
 .../manager/provider/TableDataManagerProvider.java    |  3 ++-
 .../manager/realtime/RealtimeTableDataManager.java    | 19 +++++++++++++++++--
 .../FailureInjectingTableDataManagerProvider.java     |  1 +
 .../pinot/server/starter/helix/BaseServerStarter.java | 11 ++++++++++-
 .../starter/helix/HelixInstanceDataManager.java       | 10 ++++++++--
 7 files changed, 50 insertions(+), 9 deletions(-)

diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
index c1aed30df36..2817382fb06 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/InstanceDataManager.java
@@ -192,6 +192,14 @@ public interface InstanceDataManager {
    */
   void forceCommit(String tableNameWithType, Set<String> segmentNames);
 
+  /**
+   * Installs a supplier that gates consuming-segment ingestion. While the 
supplier returns {@code false}, newly created
+   * realtime consumers wait at the entry of their consumer thread before 
pulling any data. The supplier is consulted
+   * per consumer; once it returns {@code true} for a given consumer, that 
consumer should proceed and should
+   * not be gated again. Implementations default to a no-op so subclasses opt 
in explicitly.
+   */
+  void setSupplierOfIsServerReadyToConsumeData(BooleanSupplier 
isServerReadyToConsumeData);
+
   /**
    * Enables the installation of a method to determine if a server is ready to 
server queries.
    *
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 3246e4bd743..57cedd30812 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -75,8 +75,8 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
       SegmentReloadSemaphore segmentReloadSemaphore, ExecutorService 
segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
-      BooleanSupplier isServerReadyToServeQueries, boolean 
enableAsyncSegmentRefresh,
-      ServerReloadJobStatusCache reloadJobStatusCache) {
+      BooleanSupplier isServerReadyToConsumeData, BooleanSupplier 
isServerReadyToServeQueries,
+      boolean enableAsyncSegmentRefresh, ServerReloadJobStatusCache 
reloadJobStatusCache) {
     TableDataManager tableDataManager;
     switch (tableConfig.getTableType()) {
       case OFFLINE:
@@ -94,7 +94,8 @@ public class DefaultTableDataManagerProvider implements 
TableDataManagerProvider
                   + "configured the segmentstore uri. Configure the server 
config %s",
               StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE, 
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
         }
-        tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToServeQueries);
+        tableDataManager = new 
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToConsumeData,
+            isServerReadyToServeQueries);
         break;
       default:
         throw new IllegalStateException();
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index aab64b00ae3..b0e4d11e572 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -54,6 +54,7 @@ public interface TableDataManagerProvider {
       ExecutorService segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
+      BooleanSupplier isServerReadyToConsumeData,
       BooleanSupplier isServerReadyToServeQueries,
       boolean enableAsyncSegmentRefresh,
       ServerReloadJobStatusCache reloadJobStatusCache);
@@ -64,6 +65,6 @@ public interface TableDataManagerProvider {
   @VisibleForTesting
   default TableDataManager getTableDataManager(TableConfig tableConfig, Schema 
schema) {
     return getTableDataManager(tableConfig, schema, new 
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
-        null, null, () -> true, false, new 
ServerReloadJobStatusCache("testInstance"));
+        null, null, () -> true, () -> true, false, new 
ServerReloadJobStatusCache("testInstance"));
   }
 }
diff --git 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index d3849e49e1b..15ec71fe50f 100644
--- 
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++ 
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -137,6 +137,7 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
 
   protected Cache<String, StreamMetadataProvider> _streamMetadataProviderCache;
 
+  private final BooleanSupplier _isServerReadyToConsumeData;
   private final BooleanSupplier _isServerReadyToServeQueries;
 
   // Object to track ingestion delay for all partitions
@@ -151,7 +152,18 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
   }
 
   public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
BooleanSupplier isServerReadyToServeQueries) {
+    this(segmentBuildSemaphore, () -> true, isServerReadyToServeQueries);
+  }
+
+  /**
+   * @param isServerReadyToConsumeData returns {@code false} when 
consuming-segment ingestion should be held off
+   *     (e.g. while the server is still draining startup-time work). Each 
consuming segment checks this gate at the
+   *     entry of its consumer thread; once the gate clears, it is not 
consulted again for that segment.
+   */
+  public RealtimeTableDataManager(Semaphore segmentBuildSemaphore, 
BooleanSupplier isServerReadyToConsumeData,
+      BooleanSupplier isServerReadyToServeQueries) {
     _segmentBuildSemaphore = segmentBuildSemaphore;
+    _isServerReadyToConsumeData = isServerReadyToConsumeData;
     _isServerReadyToServeQueries = isServerReadyToServeQueries;
   }
 
@@ -221,8 +233,9 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
     _streamMetadataProviderCache = getStreamMetadataProviderCache();
 
     // For dedup and partial-upsert, need to wait for all segments loaded 
before starting consuming data
+    BooleanSupplier readyForDedupOrPartialUpsert;
     if (isDedupEnabled() || isPartialUpsertEnabled()) {
-      _isTableReadyToConsumeData = new BooleanSupplier() {
+      readyForDedupOrPartialUpsert = new BooleanSupplier() {
         volatile boolean _allSegmentsLoaded;
         long _lastCheckTimeMs;
 
@@ -247,8 +260,10 @@ public class RealtimeTableDataManager extends 
BaseTableDataManager {
         }
       };
     } else {
-      _isTableReadyToConsumeData = () -> true;
+      readyForDedupOrPartialUpsert = () -> true;
     }
+    _isTableReadyToConsumeData = () -> 
readyForDedupOrPartialUpsert.getAsBoolean()
+        && _isServerReadyToConsumeData.getAsBoolean();
   }
 
   @VisibleForTesting
diff --git 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index bb649143e30..d73bcb38260 100644
--- 
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++ 
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -82,6 +82,7 @@ public class FailureInjectingTableDataManagerProvider 
implements TableDataManage
       ExecutorService segmentReloadRefreshExecutor,
       @Nullable ExecutorService segmentPreloadExecutor,
       @Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
+      BooleanSupplier isServerReadyToConsumeData,
       BooleanSupplier isServerReadyToServeQueries,
       boolean enableAsyncSegmentRefresh,
       ServerReloadJobStatusCache reloadJobStatusCache) {
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index 638553f05d1..48c8188f92f 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -767,7 +767,8 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
             _threadAccountant, sendStatsPredicate, 
keepPipelineBreakerStatsPredicate, _reloadJobStatusCache);
 
     InstanceDataManager instanceDataManager = 
_serverInstance.getInstanceDataManager();
-    instanceDataManager.setSupplierOfIsServerReadyToServeQueries(() -> 
_isServerReadyToServeQueries);
+    
instanceDataManager.setSupplierOfIsServerReadyToConsumeData(this::isServerReadyToConsumeData);
+    
instanceDataManager.setSupplierOfIsServerReadyToServeQueries(this::isServerReadyToServeQueries);
 
     // Enable Server level realtime ingestion rate limier
     
RealtimeConsumptionRateManager.getInstance().createServerRateLimiter(_serverConf,
 _serverMetrics);
@@ -946,6 +947,14 @@ public abstract class BaseServerStarter implements 
ServiceStartable {
     NettyInspector.registerMetrics(_serverMetrics);
   }
 
+  protected boolean isServerReadyToConsumeData() {
+    return true;
+  }
+
+  protected boolean isServerReadyToServeQueries() {
+    return _isServerReadyToServeQueries;
+  }
+
   protected SegmentOperationsThrottler 
createMultiColumnIndexPreprocessThrottler() {
     int maxConcurrency = Integer.parseInt(
         
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_MULTICOL_TEXT_INDEX_PREPROCESS_PARALLELISM,
diff --git 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index 9d7464cca97..ebe224f73d9 100644
--- 
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++ 
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -105,6 +105,7 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   private HelixManager _helixManager;
   private ZkHelixPropertyStore<ZNRecord> _propertyStore;
   private SegmentUploader _segmentUploader;
+  private BooleanSupplier _isServerReadyToConsumeData = () -> false;
   private BooleanSupplier _isServerReadyToServeQueries = () -> false;
 
   // Fixed size LRU cache for storing last N errors on the instance.
@@ -122,6 +123,11 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
   @Nullable
   private ExecutorService _segmentPreloadExecutor;
 
+  @Override
+  public void setSupplierOfIsServerReadyToConsumeData(BooleanSupplier 
isServerReadyToConsumeData) {
+    _isServerReadyToConsumeData = isServerReadyToConsumeData;
+  }
+
   @Override
   public void setSupplierOfIsServerReadyToServeQueries(BooleanSupplier 
isServingQueries) {
     _isServerReadyToServeQueries = isServingQueries;
@@ -342,8 +348,8 @@ public class HelixInstanceDataManager implements 
InstanceDataManager {
     TimestampIndexUtils.applyTimestampIndex(tableConfig, schema);
     TableDataManager tableDataManager =
         _tableDataManagerProvider.getTableDataManager(tableConfig, schema, 
_segmentReloadSemaphore,
-            _segmentReloadRefreshExecutor, _segmentPreloadExecutor, 
_errorCache, _isServerReadyToServeQueries,
-            _enableAsyncSegmentRefresh, _reloadJobStatusCache);
+            _segmentReloadRefreshExecutor, _segmentPreloadExecutor, 
_errorCache, _isServerReadyToConsumeData,
+            _isServerReadyToServeQueries, _enableAsyncSegmentRefresh, 
_reloadJobStatusCache);
     tableDataManager.start();
     LOGGER.info("Created table data manager for table: {}", tableNameWithType);
     return tableDataManager;


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to