This is an automated email from the ASF dual-hosted git repository.
xiangfu0 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ff7df4893c8 Add server ingestion OOM protection (#18784)
ff7df4893c8 is described below
commit ff7df4893c898251c42911902aec31953638986b
Author: Xiang Fu <[email protected]>
AuthorDate: Fri Jun 19 21:15:11 2026 -0700
Add server ingestion OOM protection (#18784)
---
.../apache/pinot/common/metrics/ServerGauge.java | 2 +
.../utils/config/TableConfigSerDeUtilsTest.java | 3 +
.../provider/DefaultTableDataManagerProvider.java | 4 +-
.../manager/provider/TableDataManagerProvider.java | 7 +-
.../realtime/RealtimeSegmentDataManager.java | 26 ++
.../manager/realtime/RealtimeTableDataManager.java | 18 +-
.../ServerIngestionOomProtectionManager.java | 373 ++++++++++++++++
.../realtime/RealtimeSegmentDataManagerTest.java | 124 ++++++
.../ServerIngestionOomProtectionManagerTest.java | 492 +++++++++++++++++++++
.../FailureInjectingRealtimeTableDataManager.java | 10 +
.../FailureInjectingTableDataManagerProvider.java | 6 +-
.../segment/local/utils/TableConfigUtils.java | 5 +-
.../segment/local/utils/TableConfigUtilsTest.java | 25 ++
.../server/starter/helix/BaseServerStarter.java | 4 +
.../starter/helix/HelixInstanceDataManager.java | 11 +-
.../table/ingestion/StreamIngestionConfig.java | 14 +
.../apache/pinot/spi/utils/CommonConstants.java | 18 +
.../examples/stream/upsertMeetupRsvp/README.md | 66 +++
.../upsertMeetupRsvp_realtime_table_config.json | 3 +-
19 files changed, 1201 insertions(+), 10 deletions(-)
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
index b4f6ae66f4a..46df92b2714 100644
---
a/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
+++
b/pinot-common/src/main/java/org/apache/pinot/common/metrics/ServerGauge.java
@@ -94,6 +94,8 @@ public enum ServerGauge implements AbstractMetrics.Gauge {
UPSERT_QUERYABLE_DOCS_IN_SNAPSHOT_COUNT("upsertQueryableDocIdsInSnapshot",
false),
REALTIME_INGESTION_OFFSET_LAG("offsetLag", false,
"The difference between latest message offset and the last consumed
message offset."),
+ REALTIME_INGESTION_OOM_PROTECTION_ACTIVE("boolean", true,
+ "Binary indicator (1 or 0) for whether the server-wide realtime
ingestion OOM throttle is active."),
REALTIME_INGESTION_UPSTREAM_OFFSET("upstreamOffset", false, "The offset of
the latest message in the upstream."),
REALTIME_INGESTION_CONSUMING_OFFSET("consumingOffset", false, "The offset of
the last consumed message."),
REALTIME_INGESTION_DELAY_MS("milliseconds", false,
diff --git
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
index 796f26b02db..fd2227f3896 100644
---
a/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
+++
b/pinot-common/src/test/java/org/apache/pinot/common/utils/config/TableConfigSerDeUtilsTest.java
@@ -58,6 +58,7 @@ import
org.apache.pinot.spi.config.table.ingestion.ParallelSegmentConsumptionPol
import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
import org.apache.pinot.spi.utils.JsonUtils;
import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
import org.testng.annotations.Test;
@@ -307,6 +308,7 @@ public class TableConfigSerDeUtilsTest {
new
StreamIngestionConfig(Collections.singletonList(Collections.singletonMap("streamType",
"kafka")));
streamIngestionConfig.setParallelSegmentConsumptionPolicy(
ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY);
+ streamIngestionConfig.setOomProtection(Enablement.ENABLE);
ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
ingestionConfig.setFilterConfig(new FilterConfig("filterFunc(foo)"));
ingestionConfig.setTransformConfigs(
@@ -499,6 +501,7 @@ public class TableConfigSerDeUtilsTest {
assertEquals(ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps().size(),
1);
assertEquals(ingestionConfig.getStreamIngestionConfig().getParallelSegmentConsumptionPolicy(),
ParallelSegmentConsumptionPolicy.ALLOW_DURING_BUILD_ONLY);
+
assertEquals(ingestionConfig.getStreamIngestionConfig().getOomProtection(),
Enablement.ENABLE);
}
private void checkTierConfigList(TableConfig tableConfig) {
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
index 57cedd30812..add13516256 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/DefaultTableDataManagerProvider.java
@@ -31,6 +31,7 @@ import
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -76,6 +77,7 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
@Nullable ExecutorService segmentPreloadExecutor,
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
BooleanSupplier isServerReadyToConsumeData, BooleanSupplier
isServerReadyToServeQueries,
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverIngestionOomProtectionThrottleState,
boolean enableAsyncSegmentRefresh, ServerReloadJobStatusCache
reloadJobStatusCache) {
TableDataManager tableDataManager;
switch (tableConfig.getTableType()) {
@@ -95,7 +97,7 @@ public class DefaultTableDataManagerProvider implements
TableDataManagerProvider
StreamConfigProperties.SERVER_UPLOAD_TO_DEEPSTORE,
CommonConstants.Server.CONFIG_OF_SEGMENT_STORE_URI));
}
tableDataManager = new
RealtimeTableDataManager(_segmentBuildSemaphore, isServerReadyToConsumeData,
- isServerReadyToServeQueries);
+ isServerReadyToServeQueries,
serverIngestionOomProtectionThrottleState);
break;
default:
throw new IllegalStateException();
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
index b0e4d11e572..023430eccad 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/provider/TableDataManagerProvider.java
@@ -26,7 +26,9 @@ import java.util.function.BooleanSupplier;
import javax.annotation.Nullable;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.helix.HelixManager;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
+import
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -56,6 +58,7 @@ public interface TableDataManagerProvider {
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
BooleanSupplier isServerReadyToConsumeData,
BooleanSupplier isServerReadyToServeQueries,
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverIngestionOomProtectionThrottleState,
boolean enableAsyncSegmentRefresh,
ServerReloadJobStatusCache reloadJobStatusCache);
@@ -65,6 +68,8 @@ public interface TableDataManagerProvider {
@VisibleForTesting
default TableDataManager getTableDataManager(TableConfig tableConfig, Schema
schema) {
return getTableDataManager(tableConfig, schema, new
SegmentReloadSemaphore(1), Executors.newSingleThreadExecutor(),
- null, null, () -> true, () -> true, false, new
ServerReloadJobStatusCache("testInstance"));
+ null, null, () -> true, () -> true,
+ ServerIngestionOomProtectionManager.createServerThrottleState(null,
ServerMetrics.get()), false,
+ new ServerReloadJobStatusCache("testInstance"));
}
}
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
index d918355b490..6246ff58570 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java
@@ -268,6 +268,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
private final PartitionUpsertMetadataManager _partitionUpsertMetadataManager;
private final PartitionDedupMetadataManager _partitionDedupMetadataManager;
private final BooleanSupplier _isReadyToConsumeData;
+ private ServerIngestionOomProtectionManager
_serverIngestionOomProtectionManager;
private final MutableSegmentImpl _realtimeSegment;
private volatile StreamPartitionMsgOffset _currentOffset; // Next offset to
be consumed
private volatile State _state;
@@ -445,6 +446,14 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
}
}
+ private boolean shouldStopWaitingForOomProtection() {
+ if (_shouldStop || _state != State.INITIAL_CONSUMING) {
+ return true;
+ }
+ return now() >= _consumeEndTime || _numRowsIndexed >= _segmentMaxRowCount
|| _endOfPartitionGroup
+ || _forceCommitMessageReceived || !canAddMore();
+ }
+
private void handleTransientStreamErrors(Exception e)
throws Exception {
_consecutiveErrorCount++;
@@ -484,6 +493,16 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_segmentLogger.info("Starting consumption loop start offset {},
finalOffset {}", _currentOffset, _finalOffset);
while (!_shouldStop && !endCriteriaReached()) {
+ if (_state == State.INITIAL_CONSUMING &&
_serverIngestionOomProtectionManager != null) {
+ boolean waitedForOomProtection =
+
_serverIngestionOomProtectionManager.waitIfProtectionNeeded(this::shouldStopWaitingForOomProtection);
+ if (_shouldStop || endCriteriaReached()) {
+ break;
+ }
+ if (waitedForOomProtection) {
+ _idleTimer.markStreamCreated();
+ }
+ }
_serverMetrics.setValueOfTableGauge(_clientId,
ServerGauge.LLC_PARTITION_CONSUMING, 1);
// Consume for the next readTime ms, or we get to final offset,
whichever happens earlier,
// Update _currentOffset upon return from this method
@@ -763,6 +782,12 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
return _realtimeSegment.canAddMore();
}
+ @VisibleForTesting
+ void setServerIngestionOomProtectionManager(
+ ServerIngestionOomProtectionManager serverIngestionOomProtectionManager)
{
+ _serverIngestionOomProtectionManager = serverIngestionOomProtectionManager;
+ }
+
public class PartitionConsumer implements Runnable {
public void run() {
long initialConsumptionEnd = 0L;
@@ -1743,6 +1768,7 @@ public class RealtimeSegmentDataManager extends
SegmentDataManager {
_partitionUpsertMetadataManager = partitionUpsertMetadataManager;
_partitionDedupMetadataManager = partitionDedupMetadataManager;
_isReadyToConsumeData = isReadyToConsumeData;
+ _serverIngestionOomProtectionManager =
realtimeTableDataManager.getServerIngestionOomProtectionManager();
_segmentVersion = indexLoadingConfig.getSegmentVersion();
_instanceId = _realtimeTableDataManager.getInstanceId();
_leaseExtender =
SegmentBuildTimeLeaseExtender.getLeaseExtender(_tableNameWithType);
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 15ec71fe50f..6cdfd08b2e1 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -50,6 +50,7 @@ import org.apache.commons.lang3.tuple.Pair;
import org.apache.pinot.common.Utils;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.common.utils.SegmentUtils;
@@ -139,12 +140,14 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
private final BooleanSupplier _isServerReadyToConsumeData;
private final BooleanSupplier _isServerReadyToServeQueries;
+ private final ServerIngestionOomProtectionManager.ServerThrottleState
_serverIngestionOomProtectionThrottleState;
// Object to track ingestion delay for all partitions
private IngestionDelayTracker _ingestionDelayTracker;
private TableDedupMetadataManager _tableDedupMetadataManager;
private BooleanSupplier _isTableReadyToConsumeData;
+ private ServerIngestionOomProtectionManager
_serverIngestionOomProtectionManager;
private boolean _enforceConsumptionInOrder = false;
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore) {
@@ -152,7 +155,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
BooleanSupplier isServerReadyToServeQueries) {
- this(segmentBuildSemaphore, () -> true, isServerReadyToServeQueries);
+ this(segmentBuildSemaphore, () -> true, isServerReadyToServeQueries,
+ // Test/legacy: per-instance non-shared unregistered throttle; ignores
cluster config; prod uses startup state.
+ ServerIngestionOomProtectionManager.createServerThrottleState(null,
ServerMetrics.get()));
}
/**
@@ -161,10 +166,12 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
* entry of its consumer thread; once the gate clears, it is not
consulted again for that segment.
*/
public RealtimeTableDataManager(Semaphore segmentBuildSemaphore,
BooleanSupplier isServerReadyToConsumeData,
- BooleanSupplier isServerReadyToServeQueries) {
+ BooleanSupplier isServerReadyToServeQueries,
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverIngestionOomProtectionThrottleState) {
_segmentBuildSemaphore = segmentBuildSemaphore;
_isServerReadyToConsumeData = isServerReadyToConsumeData;
_isServerReadyToServeQueries = isServerReadyToServeQueries;
+ _serverIngestionOomProtectionThrottleState =
serverIngestionOomProtectionThrottleState;
}
@Override
@@ -264,6 +271,9 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
}
_isTableReadyToConsumeData = () ->
readyForDedupOrPartialUpsert.getAsBoolean()
&& _isServerReadyToConsumeData.getAsBoolean();
+ _serverIngestionOomProtectionManager = new
ServerIngestionOomProtectionManager(
+ () -> getCachedTableConfigAndSchema().getLeft(), () ->
isUpsertEnabled() || isDedupEnabled(),
+ _serverIngestionOomProtectionThrottleState);
}
@VisibleForTesting
@@ -859,6 +869,10 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
return _isServerReadyToServeQueries;
}
+ ServerIngestionOomProtectionManager getServerIngestionOomProtectionManager()
{
+ return _serverIngestionOomProtectionManager;
+ }
+
/**
* Validate a schema against the table config for real-time record
consumption.
* Ideally, we should validate these things when schema is added or table is
created, but either of these
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java
new file mode 100644
index 00000000000..1a0b9845df5
--- /dev/null
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java
@@ -0,0 +1,373 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Applies server-local backpressure to realtime ingestion while JVM heap
usage is above a configured threshold.
+public class ServerIngestionOomProtectionManager {
+ private static final Logger LOGGER =
LoggerFactory.getLogger(ServerIngestionOomProtectionManager.class);
+ private static final String SERVER_INSTANCE_CONFIG_PREFIX =
+ CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + ".";
+ private static final Set<String> SERVER_INGESTION_OOM_PROTECTION_CONFIG_KEYS
= Set.of(
+ CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE,
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD,
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD,
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS);
+
+ // Server mode has the UPSERT_DEDUP_ONLY policy; table-level override
intentionally only supports ENABLE/DISABLE.
+ enum ServerMode {
+ ENABLE, UPSERT_DEDUP_ONLY, DISABLE
+ }
+
+ private static final ServerMode DEFAULT_SERVER_MODE =
+
ServerMode.valueOf(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_MODE);
+
+ private final Supplier<TableConfig> _tableConfigSupplier;
+ private final BooleanSupplier _isUpsertOrDedupEnabledSupplier;
+ private final ServerThrottleState _serverThrottleState;
+
+ ServerIngestionOomProtectionManager(Supplier<TableConfig>
tableConfigSupplier,
+ BooleanSupplier isUpsertOrDedupEnabledSupplier,
+ ServerThrottleState serverThrottleState) {
+ _tableConfigSupplier = tableConfigSupplier;
+ _isUpsertOrDedupEnabledSupplier = isUpsertOrDedupEnabledSupplier;
+ _serverThrottleState = serverThrottleState;
+ }
+
+ public boolean waitIfProtectionNeeded(BooleanSupplier stopCondition)
+ throws InterruptedException {
+ boolean waited = false;
+ while (!stopCondition.getAsBoolean() && shouldThrottle()) {
+ waited = true;
+ Thread.sleep(_serverThrottleState.getCheckIntervalMs());
+ }
+ return waited;
+ }
+
+ @VisibleForTesting
+ boolean shouldThrottle() {
+ return isEnabledForTable(_tableConfigSupplier.get()) &&
_serverThrottleState.shouldThrottle();
+ }
+
+ @VisibleForTesting
+ boolean isThrottling() {
+ return _serverThrottleState.isThrottling();
+ }
+
+ @VisibleForTesting
+ void resetMetrics() {
+ _serverThrottleState.resetMetrics();
+ }
+
+ private boolean isEnabledForTable(@Nullable TableConfig tableConfig) {
+ if (tableConfig == null ||
!TableNameBuilder.isRealtimeTableResource(tableConfig.getTableName())) {
+ return false;
+ }
+ return getTableEnablement(tableConfig).isEnabled(() -> switch
(_serverThrottleState.getMode()) {
+ case ENABLE -> true;
+ case UPSERT_DEDUP_ONLY -> _isUpsertOrDedupEnabledSupplier.getAsBoolean();
+ case DISABLE -> false;
+ });
+ }
+
+ private static Enablement getTableEnablement(@Nullable TableConfig
tableConfig) {
+ if (tableConfig == null) {
+ return Enablement.DEFAULT;
+ }
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ if (ingestionConfig == null) {
+ return Enablement.DEFAULT;
+ }
+ StreamIngestionConfig streamIngestionConfig =
ingestionConfig.getStreamIngestionConfig();
+ return streamIngestionConfig != null ?
streamIngestionConfig.getOomProtection() : Enablement.DEFAULT;
+ }
+
+ public static ServerThrottleState createServerThrottleState(
+ @Nullable PinotConfiguration instanceDataManagerConfig, ServerMetrics
serverMetrics) {
+ return new ServerThrottleState(instanceDataManagerConfig, serverMetrics,
ResourceUsageUtils::getUsedHeapSize,
+ ResourceUsageUtils::getMaxHeapSize, System::currentTimeMillis,
System::gc);
+ }
+
+ private static boolean containsIngestionOomProtectionConfig(Set<String>
configKeys) {
+ for (String key : configKeys) {
+ String unprefixedKey = key.startsWith(SERVER_INSTANCE_CONFIG_PREFIX)
+ ? key.substring(SERVER_INSTANCE_CONFIG_PREFIX.length())
+ : key;
+ if (SERVER_INGESTION_OOM_PROTECTION_CONFIG_KEYS.contains(unprefixedKey))
{
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private static ServerMode getMode(String rawMode) {
+ if (rawMode == null) {
+ LOGGER.warn("Invalid server ingestion OOM protection mode: null. Falling
back to: {}",
+ DEFAULT_SERVER_MODE);
+ return DEFAULT_SERVER_MODE;
+ }
+ try {
+ return ServerMode.valueOf(rawMode.toUpperCase(Locale.ROOT));
+ } catch (IllegalArgumentException e) {
+ LOGGER.warn("Invalid server ingestion OOM protection mode: {}. Falling
back to: {}",
+ rawMode, DEFAULT_SERVER_MODE);
+ return DEFAULT_SERVER_MODE;
+ }
+ }
+
+ /// Server-wide realtime ingestion OOM protection state shared by all
realtime table managers in one server process.
+ public static class ServerThrottleState implements
PinotClusterConfigChangeListener {
+ private final ServerMetrics _serverMetrics;
+ private final PinotConfiguration _instanceDataManagerConfig;
+ private final LongSupplier _usedHeapSupplier;
+ private final LongSupplier _maxHeapSupplier;
+ private final LongSupplier _currentTimeMsSupplier;
+ private final Runnable _gcRunner;
+
+ private volatile ConfigSnapshot _config;
+ private volatile boolean _throttling;
+ private volatile long _lastCheckTimeMs = -1L;
+ private long _lastGcRequestTimeMs = -1L;
+ private volatile boolean _loggedInvalidHeapSize;
+ private volatile boolean _loggedInvalidThresholds;
+
+ ServerThrottleState(@Nullable PinotConfiguration
instanceDataManagerConfig, ServerMetrics serverMetrics,
+ LongSupplier usedHeapSupplier, LongSupplier maxHeapSupplier,
LongSupplier currentTimeMsSupplier,
+ Runnable gcRunner) {
+ _serverMetrics = serverMetrics;
+ _instanceDataManagerConfig =
+ instanceDataManagerConfig != null ? new
PinotConfiguration(instanceDataManagerConfig.toMap())
+ : new PinotConfiguration();
+ _usedHeapSupplier = usedHeapSupplier;
+ _maxHeapSupplier = maxHeapSupplier;
+ _currentTimeMsSupplier = currentTimeMsSupplier;
+ _gcRunner = gcRunner;
+ _config = ConfigSnapshot.fromConfig(_instanceDataManagerConfig,
Map.of());
+ }
+
+ @Override
+ public void onChange(Set<String> changedConfigs, Map<String, String>
clusterConfigs) {
+ if (containsIngestionOomProtectionConfig(changedConfigs)) {
+ updateConfigFromClusterConfigs(Map.copyOf(clusterConfigs));
+ }
+ }
+
+ boolean shouldThrottle() {
+ long nowMs = _currentTimeMsSupplier.getAsLong();
+ if (isWithinCheckInterval(nowMs)) {
+ return _throttling;
+ }
+ synchronized (this) {
+ nowMs = _currentTimeMsSupplier.getAsLong();
+ if (isWithinCheckInterval(nowMs)) {
+ return _throttling;
+ }
+ _lastCheckTimeMs = nowMs;
+ return sampleAndUpdate(nowMs);
+ }
+ }
+
+ boolean isThrottling() {
+ return _throttling;
+ }
+
+ long getCheckIntervalMs() {
+ return _config._checkIntervalMs;
+ }
+
+ ServerMode getMode() {
+ return _config._mode;
+ }
+
+ void resetMetrics() {
+ updateThrottling(false, 0.0);
+ }
+
+ private void updateConfigFromClusterConfigs(Map<String, String>
clusterConfigs) {
+ try {
+ ConfigSnapshot oldConfig = _config;
+ ConfigSnapshot config =
ConfigSnapshot.fromConfig(_instanceDataManagerConfig, clusterConfigs);
+ _config = config;
+ _lastCheckTimeMs = -1L;
+ _loggedInvalidThresholds = false;
+ if (_throttling && (oldConfig._mode != config._mode ||
!config.isValidThresholdConfig())) {
+ updateThrottling(false, 0.0, config);
+ }
+ LOGGER.info("Updated server ingestion OOM protection config: mode={},
heapUsageThrottleThreshold={}%, "
+ + "heapUsageRecoveryThreshold={}%, checkIntervalMs={},
gcIntervalMs={}", config._mode,
+ Math.round(config._heapUsageThrottleThreshold * 100),
Math.round(config._heapUsageRecoveryThreshold * 100),
+ config._checkIntervalMs, config._gcIntervalMs);
+ } catch (RuntimeException e) {
+ LOGGER.warn("Ignoring invalid server ingestion OOM protection cluster
config update", e);
+ }
+ }
+
+ private boolean sampleAndUpdate(long nowMs) {
+ ConfigSnapshot config = _config;
+ if (!config.isValidThresholdConfig()) {
+ if (!_loggedInvalidThresholds) {
+ LOGGER.warn("Disabling server ingestion OOM protection because
thresholds are invalid. "
+ + "heapUsageThrottleThreshold: {},
heapUsageRecoveryThreshold: {}",
+ config._heapUsageThrottleThreshold,
config._heapUsageRecoveryThreshold);
+ _loggedInvalidThresholds = true;
+ }
+ updateThrottling(false, 0.0);
+ return false;
+ }
+
+ long maxHeapBytes = _maxHeapSupplier.getAsLong();
+ if (maxHeapBytes <= 0) {
+ if (!_loggedInvalidHeapSize) {
+ LOGGER.warn("Disabling server ingestion OOM protection because max
heap size is not available: {}",
+ maxHeapBytes);
+ _loggedInvalidHeapSize = true;
+ }
+ updateThrottling(false, 0.0);
+ return false;
+ }
+
+ double heapUsageRatio = Math.min(1.0, Math.max(0.0, (double)
_usedHeapSupplier.getAsLong() / maxHeapBytes));
+ boolean shouldThrottle = _throttling ? heapUsageRatio >
config._heapUsageRecoveryThreshold
+ : heapUsageRatio >= config._heapUsageThrottleThreshold;
+ updateThrottling(shouldThrottle, heapUsageRatio, config);
+ if (shouldThrottle) {
+ requestGcIfNeeded(heapUsageRatio, nowMs, config._gcIntervalMs);
+ }
+ return shouldThrottle;
+ }
+
+ private void updateThrottling(boolean shouldThrottle, double
heapUsageRatio) {
+ updateThrottling(shouldThrottle, heapUsageRatio, _config);
+ }
+
+ private void updateThrottling(boolean shouldThrottle, double
heapUsageRatio, ConfigSnapshot config) {
+ boolean wasThrottling = _throttling;
+ _throttling = shouldThrottle;
+
_serverMetrics.setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
+ shouldThrottle ? 1L : 0L);
+ if (shouldThrottle && !wasThrottling) {
+ LOGGER.warn("Server ingestion OOM protection activated. Heap usage:
{}%, threshold: {}%, "
+ + "recovery threshold: {}%", Math.round(heapUsageRatio * 100),
+ Math.round(config._heapUsageThrottleThreshold * 100),
+ Math.round(config._heapUsageRecoveryThreshold * 100));
+ } else if (shouldThrottle) {
+ LOGGER.warn("Server ingestion OOM protection remains active. Heap
usage: {}%, recovery threshold: {}%",
+ Math.round(heapUsageRatio * 100),
Math.round(config._heapUsageRecoveryThreshold * 100));
+ } else if (!shouldThrottle && wasThrottling) {
+ _lastGcRequestTimeMs = -1L;
+ LOGGER.info("Server ingestion OOM protection released. Heap usage:
{}%, recovery threshold: {}%",
+ Math.round(heapUsageRatio * 100),
Math.round(config._heapUsageRecoveryThreshold * 100));
+ }
+ }
+
+ private void requestGcIfNeeded(double heapUsageRatio, long nowMs, long
gcIntervalMs) {
+ if (gcIntervalMs <= 0) {
+ return;
+ }
+ if (_lastGcRequestTimeMs >= 0 && nowMs >= _lastGcRequestTimeMs
+ && nowMs - _lastGcRequestTimeMs < gcIntervalMs) {
+ return;
+ }
+ _lastGcRequestTimeMs = nowMs;
+ LOGGER.warn("Requesting JVM GC while server ingestion OOM protection is
active. Heap usage: {}%",
+ Math.round(heapUsageRatio * 100));
+ _gcRunner.run();
+ }
+
+ private boolean isWithinCheckInterval(long nowMs) {
+ return _lastCheckTimeMs >= 0 && nowMs - _lastCheckTimeMs <
_config._checkIntervalMs;
+ }
+ }
+
+ private static class ConfigSnapshot {
+ private final ServerMode _mode;
+ private final double _heapUsageThrottleThreshold;
+ private final double _heapUsageRecoveryThreshold;
+ private final long _checkIntervalMs;
+ private final long _gcIntervalMs;
+
+ private ConfigSnapshot(PinotConfiguration config) {
+ _mode = getMode(config.getProperty(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE,
+
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_MODE));
+ _heapUsageThrottleThreshold = config.getProperty(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD,
+
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD);
+ _heapUsageRecoveryThreshold = config.getProperty(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD,
+
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD);
+ long checkIntervalMs = config.getProperty(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
+
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ _checkIntervalMs = checkIntervalMs > 0 ? checkIntervalMs
+ :
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS;
+ _gcIntervalMs = config.getProperty(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS,
+
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS);
+ }
+
+ private static ConfigSnapshot fromConfig(PinotConfiguration
instanceDataManagerConfig,
+ Map<String, String> clusterConfigs) {
+ Map<String, Object> mergedConfig = new
HashMap<>(instanceDataManagerConfig.toMap());
+ for (String configKey : SERVER_INGESTION_OOM_PROTECTION_CONFIG_KEYS) {
+ String clusterConfigValue = getClusterConfigValue(clusterConfigs,
configKey);
+ if (clusterConfigValue != null) {
+ mergedConfig.put(configKey, clusterConfigValue);
+ }
+ }
+ return new ConfigSnapshot(new PinotConfiguration(mergedConfig));
+ }
+
+ private static String getClusterConfigValue(Map<String, String>
clusterConfigs, String configKey) {
+ String value = clusterConfigs.get(configKey);
+ if (value != null) {
+ return value;
+ }
+ return clusterConfigs.get(SERVER_INSTANCE_CONFIG_PREFIX + configKey);
+ }
+
+ private boolean isValidThresholdConfig() {
+ return _heapUsageThrottleThreshold > 0.0 && _heapUsageThrottleThreshold
< 1.0
+ && _heapUsageRecoveryThreshold > 0.0 && _heapUsageRecoveryThreshold
< _heapUsageThrottleThreshold;
+ }
+ }
+}
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
index 367d71112db..41fb745acd4 100644
---
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManagerTest.java
@@ -31,6 +31,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
+import java.util.function.BooleanSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
@@ -70,7 +71,10 @@ import org.testng.annotations.Test;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -864,6 +868,109 @@ public class RealtimeSegmentDataManagerTest {
}
}
+ @Test
+ public void
testServerIngestionOomProtectionWaitsAndResumesWhileInitialConsuming()
+ throws Exception {
+ TimeSupplier timeSupplier = new TimeSupplier();
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
+ String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS),
"10m", null)) {
+ segmentDataManager._stubConsumeLoop = false;
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+ ServerIngestionOomProtectionManager oomProtectionManager =
mock(ServerIngestionOomProtectionManager.class);
+
when(oomProtectionManager.waitIfProtectionNeeded(any())).thenReturn(true,
false);
+
segmentDataManager.setServerIngestionOomProtectionManager(oomProtectionManager);
+
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ final LongMsgOffset endOffset =
+ new LongMsgOffset(START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ segmentDataManager._consumeOffsets.add(endOffset);
+ SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(
+ new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+ .withStreamPartitionMsgOffset(endOffset.toString()));
+ segmentDataManager._responses.add(response);
+
+ consumer.run();
+
+ verify(oomProtectionManager, atLeast(1)).waitIfProtectionNeeded(any());
+ Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+ FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ }
+ }
+
+ @Test
+ public void
testServerIngestionOomProtectionStopPredicateDoesNotMutateEndCriteria()
+ throws Exception {
+ TimeSupplier timeSupplier = new TimeSupplier();
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
+ String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS),
"10m", null)) {
+ segmentDataManager._stubConsumeLoop = false;
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.INITIAL_CONSUMING);
+
+ ServerIngestionOomProtectionManager oomProtectionManager =
mock(ServerIngestionOomProtectionManager.class);
+
when(oomProtectionManager.waitIfProtectionNeeded(any())).thenAnswer(invocation
-> {
+ BooleanSupplier stopCondition = invocation.getArgument(0);
+ long consumeEndTime = segmentDataManager.getConsumeEndTime();
+ timeSupplier.set(consumeEndTime);
+
+ Assert.assertTrue(stopCondition.getAsBoolean());
+ Assert.assertEquals(segmentDataManager.getConsumeEndTime(),
consumeEndTime);
+ return true;
+ });
+
segmentDataManager.setServerIngestionOomProtectionManager(oomProtectionManager);
+
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ final LongMsgOffset endOffset =
+ new LongMsgOffset(START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ segmentDataManager._consumeOffsets.add(endOffset);
+ SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(
+ new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+ .withStreamPartitionMsgOffset(endOffset.toString()));
+ segmentDataManager._responses.add(response);
+
+ consumer.run();
+
+ verify(oomProtectionManager, atLeast(1)).waitIfProtectionNeeded(any());
+ Assert.assertEquals(segmentDataManager.getSegment().getNumDocsIndexed(),
+ FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ }
+ }
+
+ @Test
+ public void testServerIngestionOomProtectionIsSkippedWhileCatchingUp()
+ throws Exception {
+ TimeSupplier timeSupplier = new TimeSupplier();
+ try (FakeRealtimeSegmentDataManager segmentDataManager =
createFakeSegmentManager(true, timeSupplier,
+ String.valueOf(FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS *
2), "10m", null)) {
+ segmentDataManager._stubConsumeLoop = false;
+ segmentDataManager._state.set(segmentDataManager,
RealtimeSegmentDataManager.State.CATCHING_UP);
+ LongMsgOffset finalOffset =
+ new LongMsgOffset(START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ Field finalOffsetField =
RealtimeSegmentDataManager.class.getDeclaredField("_finalOffset");
+ finalOffsetField.setAccessible(true);
+ finalOffsetField.set(segmentDataManager, finalOffset);
+
+ ServerIngestionOomProtectionManager oomProtectionManager =
mock(ServerIngestionOomProtectionManager.class);
+
when(oomProtectionManager.waitIfProtectionNeeded(any())).thenReturn(true);
+
segmentDataManager.setServerIngestionOomProtectionManager(oomProtectionManager);
+
+ RealtimeSegmentDataManager.PartitionConsumer consumer =
segmentDataManager.createPartitionConsumer();
+ SegmentCompletionProtocol.Response response = new
SegmentCompletionProtocol.Response(
+ new SegmentCompletionProtocol.Response.Params().withStatus(
+ SegmentCompletionProtocol.ControllerResponseStatus.COMMIT)
+ .withStreamPartitionMsgOffset(finalOffset.toString()));
+ segmentDataManager._responses.add(response);
+
+ consumer.run();
+
+ verify(oomProtectionManager, never()).waitIfProtectionNeeded(any());
+ Assert.assertEquals(((LongMsgOffset)
segmentDataManager.getCurrentOffset()).getOffset(),
+ START_OFFSET_VALUE +
FakeStreamConfigUtils.SEGMENT_FLUSH_THRESHOLD_ROWS);
+ }
+ }
+
@Test
public void testCompletionModeDownloadWithUrlValidation()
throws Exception {
@@ -1267,6 +1374,10 @@ public class RealtimeSegmentDataManagerTest {
setLong(endTime, "_consumeEndTime");
}
+ public long getConsumeEndTime() {
+ return getLong("_consumeEndTime");
+ }
+
public void setNumRowsConsumed(int numRows) {
setInt(numRows, "_numRowsConsumed");
}
@@ -1321,6 +1432,19 @@ public class RealtimeSegmentDataManagerTest {
}
}
+ private long getLong(String fieldName) {
+ try {
+ Field field =
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.getLong(this);
+ } catch (NoSuchFieldException e) {
+ Assert.fail();
+ } catch (IllegalAccessException e) {
+ Assert.fail();
+ }
+ throw new RuntimeException("Cannot get here");
+ }
+
private void setOffset(long value, String fieldName) {
try {
Field field =
RealtimeSegmentDataManager.class.getDeclaredField(fieldName);
diff --git
a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java
new file mode 100644
index 00000000000..b2b640d37be
--- /dev/null
+++
b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManagerTest.java
@@ -0,0 +1,492 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.LongSupplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.TableType;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.builder.TableConfigBuilder;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoInteractions;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
+
+
+public class ServerIngestionOomProtectionManagerTest {
+ private static final String RAW_TABLE_NAME = "testTable";
+ private static final String REALTIME_TABLE_NAME =
TableNameBuilder.REALTIME.tableNameWithType(RAW_TABLE_NAME);
+
+ @Test
+ public void testDefaultDisableModeDoesNotProtectTables() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ TableConfig tableConfig = buildTableConfig(null);
+
+ ServerIngestionOomProtectionManager statelessRealtimeManager =
+ buildManager(Collections.emptyMap(), tableConfig, false,
usedHeapBytes, maxHeapBytes, nowMs);
+ assertFalse(statelessRealtimeManager.shouldThrottle());
+
+ ServerIngestionOomProtectionManager upsertOrDedupManager =
+ buildManager(Collections.emptyMap(), tableConfig, true, usedHeapBytes,
maxHeapBytes, nowMs);
+ assertFalse(upsertOrDedupManager.shouldThrottle());
+ }
+
+ @Test
+ public void testUpsertDedupOnlyModeAppliesOnlyToUpsertOrDedupTables() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ TableConfig tableConfig = buildTableConfig(null);
+
+ ServerIngestionOomProtectionManager statelessRealtimeManager =
+ buildManager(upsertDedupOnlyServerConfig(), tableConfig, false,
usedHeapBytes, maxHeapBytes,
+ nowMs);
+ assertFalse(statelessRealtimeManager.shouldThrottle());
+
+ ServerIngestionOomProtectionManager upsertOrDedupManager =
+ buildManager(upsertDedupOnlyServerConfig(), tableConfig, true,
usedHeapBytes, maxHeapBytes,
+ nowMs);
+ assertTrue(upsertOrDedupManager.shouldThrottle());
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(91);
+ assertTrue(upsertOrDedupManager.shouldThrottle());
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(90);
+ assertFalse(upsertOrDedupManager.shouldThrottle());
+ }
+
+ @Test
+ public void testTableDisableOverridesServerPolicy() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+
+ ServerIngestionOomProtectionManager manager =
+ buildManager(upsertDedupOnlyServerConfig(),
buildTableConfig(Enablement.DISABLE), true, usedHeapBytes,
+ maxHeapBytes, nowMs);
+
+ assertFalse(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testDisabledTableDoesNotUpdateGauges() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+
+ ServerIngestionOomProtectionManager manager =
+ buildManager(upsertDedupOnlyServerConfig(),
buildTableConfig(Enablement.DISABLE), true, usedHeapBytes,
+ maxHeapBytes, nowMs, serverMetrics);
+
+ assertFalse(manager.shouldThrottle());
+ verifyNoInteractions(serverMetrics);
+ }
+
+ @Test
+ public void
testSharedServerThrottleStateSamplesHeapOnceAcrossManagersWithinInterval() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ AtomicInteger usedHeapReads = new AtomicInteger();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(upsertDedupOnlyServerConfig(), () -> {
+ usedHeapReads.incrementAndGet();
+ return usedHeapBytes.get();
+ }, maxHeapBytes::get, nowMs, serverMetrics);
+ ServerIngestionOomProtectionManager manager1 =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+ ServerIngestionOomProtectionManager manager2 =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+
+ assertTrue(manager1.shouldThrottle());
+ assertTrue(manager2.shouldThrottle());
+ assertEquals(usedHeapReads.get(), 1);
+ }
+
+ @Test
+ public void testTableEnableUsesServerThresholds() {
+ AtomicLong usedHeapBytes = new AtomicLong(90);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+
+ ServerIngestionOomProtectionManager manager =
+ buildManager(Collections.emptyMap(),
buildTableConfig(Enablement.ENABLE), false, usedHeapBytes,
+ maxHeapBytes, nowMs);
+
+ assertFalse(manager.shouldThrottle());
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(96);
+ assertTrue(manager.shouldThrottle());
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(91);
+ assertTrue(manager.shouldThrottle());
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(90);
+ assertFalse(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testGcRequestedWhenThrottlingAndRateLimited() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ AtomicInteger gcRequests = new AtomicInteger();
+ ServerIngestionOomProtectionManager manager =
+ buildManager(upsertDedupOnlyServerConfig(), buildTableConfig(null),
true, usedHeapBytes,
+ maxHeapBytes, nowMs, mock(ServerMetrics.class), () ->
gcRequests.incrementAndGet());
+
+ assertTrue(manager.shouldThrottle());
+ assertEquals(gcRequests.get(), 1);
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ assertTrue(manager.shouldThrottle());
+ assertEquals(gcRequests.get(), 1);
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS);
+ assertTrue(manager.shouldThrottle());
+ assertEquals(gcRequests.get(), 2);
+ }
+
+ @Test
+ public void testGcRequestCanBeDisabled() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ AtomicInteger gcRequests = new AtomicInteger();
+ Map<String, Object> serverConfig =
+
upsertDedupOnlyServerConfig(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS,
+ 0L);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(serverConfig, buildTableConfig(null), true,
usedHeapBytes, maxHeapBytes, nowMs,
+ mock(ServerMetrics.class), () -> gcRequests.incrementAndGet());
+
+ assertTrue(manager.shouldThrottle());
+ assertEquals(gcRequests.get(), 0);
+ }
+
+ @Test
+ public void testGcRequestTimerResetsWhenThrottlingReleases() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ AtomicInteger gcRequests = new AtomicInteger();
+ ServerIngestionOomProtectionManager manager =
+ buildManager(upsertDedupOnlyServerConfig(), buildTableConfig(null),
true, usedHeapBytes,
+ maxHeapBytes, nowMs, mock(ServerMetrics.class), () ->
gcRequests.incrementAndGet());
+
+ assertTrue(manager.shouldThrottle());
+ assertEquals(gcRequests.get(), 1);
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(90);
+ assertFalse(manager.shouldThrottle());
+
+
nowMs.addAndGet(CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+ usedHeapBytes.set(96);
+ assertTrue(manager.shouldThrottle());
+ assertEquals(gcRequests.get(), 2);
+ }
+
+ @Test
+ public void testGcRequestIsRateLimitedAcrossManagers() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ AtomicInteger gcRequests = new AtomicInteger();
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(upsertDedupOnlyServerConfig(),
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+ mock(ServerMetrics.class), () -> gcRequests.incrementAndGet());
+ ServerIngestionOomProtectionManager manager1 =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+ ServerIngestionOomProtectionManager manager2 =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+
+ assertTrue(manager1.shouldThrottle());
+ assertTrue(manager2.shouldThrottle());
+ assertEquals(gcRequests.get(), 1);
+ }
+
+ @Test
+ public void testServerEnableModeProtectsAllRealtimeTables() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ Map<String, Object> serverConfig =
+ serverModeConfig("ENABLE");
+
+ ServerIngestionOomProtectionManager manager =
+ buildManager(serverConfig, buildTableConfig(null), false,
usedHeapBytes, maxHeapBytes,
+ nowMs);
+
+ assertTrue(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testClusterConfigCanDynamicallyUpdateServerMode() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(Collections.emptyMap(), usedHeapBytes::get,
maxHeapBytes::get, nowMs, serverMetrics);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+
+ assertFalse(manager.shouldThrottle());
+
+ String modeConfigKey =
+
fullServerInstanceConfigKey(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE);
+ serverThrottleState.onChange(Set.of(modeConfigKey), Map.of(modeConfigKey,
"UPSERT_DEDUP_ONLY"));
+
+ assertTrue(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testClusterConfigCanDynamicallyUpdateServerThresholds() {
+ AtomicLong usedHeapBytes = new AtomicLong(92);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(upsertDedupOnlyServerConfig(),
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+ serverMetrics);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+
+ assertFalse(manager.shouldThrottle());
+
+ String throttleThresholdConfigKey = fullServerInstanceConfigKey(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD);
+ String recoveryThresholdConfigKey = fullServerInstanceConfigKey(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD);
+ serverThrottleState.onChange(Set.of(throttleThresholdConfigKey,
recoveryThresholdConfigKey),
+ Map.of(throttleThresholdConfigKey, "0.90", recoveryThresholdConfigKey,
"0.85"));
+
+ assertTrue(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testInvalidClusterConfigUpdateKeepsPreviousConfig() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(upsertDedupOnlyServerConfig(),
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+ mock(ServerMetrics.class));
+ ServerIngestionOomProtectionManager manager =
+ buildManager(buildTableConfig(null), true, serverThrottleState);
+
+ String thresholdConfigKey = fullServerInstanceConfigKey(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD);
+ serverThrottleState.onChange(Set.of(thresholdConfigKey),
Map.of(thresholdConfigKey, "not-a-double"));
+
+ assertTrue(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testClusterConfigDeletionFallsBackToInstanceConfig() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(serverModeConfig("ENABLE"),
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+ serverMetrics);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(buildTableConfig(null), false, serverThrottleState);
+
+ String modeConfigKey =
+
fullServerInstanceConfigKey(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE);
+ serverThrottleState.onChange(Set.of(modeConfigKey), Map.of(modeConfigKey,
"DISABLE"));
+ assertFalse(manager.shouldThrottle());
+
+ serverThrottleState.onChange(Set.of(modeConfigKey),
Collections.emptyMap());
+ assertTrue(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testClusterConfigModeChangeClearsActiveThrottle() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverThrottleState =
+ buildServerThrottleState(serverModeConfig("ENABLE"),
usedHeapBytes::get, maxHeapBytes::get, nowMs,
+ serverMetrics);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(buildTableConfig(null), false, serverThrottleState);
+
+ assertTrue(manager.shouldThrottle());
+
+ String modeConfigKey =
+
fullServerInstanceConfigKey(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE);
+ serverThrottleState.onChange(Set.of(modeConfigKey), Map.of(modeConfigKey,
"DISABLE"));
+
+ assertFalse(manager.isThrottling());
+
verify(serverMetrics).setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
0L);
+ }
+
+ @Test
+ public void testTableEnableOverridesServerDisableMode() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+
+ ServerIngestionOomProtectionManager manager =
+ buildManager(Collections.emptyMap(),
buildTableConfig(Enablement.ENABLE), true, usedHeapBytes,
+ maxHeapBytes, nowMs);
+
+ assertTrue(manager.shouldThrottle());
+ }
+
+ @Test
+ public void testResetMetricsClearsProtectionGauges() {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(upsertDedupOnlyServerConfig(), buildTableConfig(null),
true, usedHeapBytes,
+ maxHeapBytes, nowMs, serverMetrics);
+
+ assertTrue(manager.shouldThrottle());
+ manager.resetMetrics();
+
+
verify(serverMetrics).setValueOfGlobalGauge(ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
0L);
+ }
+
+ @Test
+ public void testWaitIfProtectionNeededStopsWhenStopConditionIsMet()
+ throws Exception {
+ AtomicLong usedHeapBytes = new AtomicLong(96);
+ AtomicLong maxHeapBytes = new AtomicLong(100);
+ AtomicLong nowMs = new AtomicLong();
+ AtomicInteger stopChecks = new AtomicInteger();
+ ServerMetrics serverMetrics = mock(ServerMetrics.class);
+ Map<String, Object> serverConfig =
+ upsertDedupOnlyServerConfig(
+
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
1L);
+ ServerIngestionOomProtectionManager manager =
+ buildManager(serverConfig, buildTableConfig(null), true,
usedHeapBytes, maxHeapBytes, nowMs,
+ serverMetrics);
+
+ assertTrue(manager.waitIfProtectionNeeded(() ->
stopChecks.getAndIncrement() > 0));
+ }
+
+ private static ServerIngestionOomProtectionManager buildManager(Map<String,
Object> serverConfig,
+ TableConfig tableConfig, boolean upsertOrDedupEnabled, AtomicLong
usedHeapBytes, AtomicLong maxHeapBytes,
+ AtomicLong nowMs) {
+ return buildManager(serverConfig, tableConfig, upsertOrDedupEnabled,
usedHeapBytes, maxHeapBytes, nowMs,
+ mock(ServerMetrics.class));
+ }
+
+ private static ServerIngestionOomProtectionManager buildManager(Map<String,
Object> serverConfig,
+ TableConfig tableConfig, boolean upsertOrDedupEnabled, AtomicLong
usedHeapBytes, AtomicLong maxHeapBytes,
+ AtomicLong nowMs, ServerMetrics serverMetrics) {
+ return buildManager(serverConfig, tableConfig, upsertOrDedupEnabled,
usedHeapBytes, maxHeapBytes, nowMs,
+ serverMetrics, ServerIngestionOomProtectionManagerTest::noOpGc);
+ }
+
+ private static ServerIngestionOomProtectionManager buildManager(Map<String,
Object> serverConfig,
+ TableConfig tableConfig, boolean upsertOrDedupEnabled, AtomicLong
usedHeapBytes, AtomicLong maxHeapBytes,
+ AtomicLong nowMs, ServerMetrics serverMetrics, Runnable gcRunner) {
+ return new ServerIngestionOomProtectionManager(() -> tableConfig, () ->
upsertOrDedupEnabled,
+ new ServerIngestionOomProtectionManager.ServerThrottleState(new
PinotConfiguration(serverConfig), serverMetrics,
+ usedHeapBytes::get, maxHeapBytes::get, nowMs::get, gcRunner));
+ }
+
+ private static ServerIngestionOomProtectionManager buildManager(TableConfig
tableConfig,
+ boolean upsertOrDedupEnabled,
ServerIngestionOomProtectionManager.ServerThrottleState serverThrottleState) {
+ return new ServerIngestionOomProtectionManager(() -> tableConfig, () ->
upsertOrDedupEnabled, serverThrottleState);
+ }
+
+ private static ServerIngestionOomProtectionManager.ServerThrottleState
buildServerThrottleState(
+ Map<String, Object> serverConfig, LongSupplier usedHeapSupplier,
LongSupplier maxHeapSupplier,
+ AtomicLong nowMs, ServerMetrics serverMetrics) {
+ return buildServerThrottleState(serverConfig, usedHeapSupplier,
maxHeapSupplier, nowMs, serverMetrics,
+ ServerIngestionOomProtectionManagerTest::noOpGc);
+ }
+
+ private static ServerIngestionOomProtectionManager.ServerThrottleState
buildServerThrottleState(
+ Map<String, Object> serverConfig, LongSupplier usedHeapSupplier,
LongSupplier maxHeapSupplier,
+ AtomicLong nowMs, ServerMetrics serverMetrics, Runnable gcRunner) {
+ return new ServerIngestionOomProtectionManager.ServerThrottleState(new
PinotConfiguration(serverConfig),
+ serverMetrics, usedHeapSupplier, maxHeapSupplier, nowMs::get,
gcRunner);
+ }
+
+ private static Map<String, Object> upsertDedupOnlyServerConfig() {
+ return serverModeConfig("UPSERT_DEDUP_ONLY");
+ }
+
+ private static Map<String, Object> upsertDedupOnlyServerConfig(String key,
Object value) {
+ return
Map.of(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE,
"UPSERT_DEDUP_ONLY", key,
+ value);
+ }
+
+ private static Map<String, Object> serverModeConfig(String mode) {
+ return
Map.of(CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE,
mode);
+ }
+
+ private static String fullServerInstanceConfigKey(String key) {
+ return CommonConstants.Server.INSTANCE_DATA_MANAGER_CONFIG_PREFIX + "." +
key;
+ }
+
+ private static TableConfig buildTableConfig(@Nullable Enablement
oomProtection) {
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(RAW_TABLE_NAME).build();
+ if (oomProtection != null) {
+ StreamIngestionConfig streamIngestionConfig =
+ new StreamIngestionConfig(List.of(Map.of("streamType", "kafka")));
+ streamIngestionConfig.setOomProtection(oomProtection);
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+ tableConfig.setIngestionConfig(ingestionConfig);
+ }
+ return tableConfig;
+ }
+
+ private static void noOpGc() {
+ }
+}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
index 1eb5d402f5f..68d13ad05ff 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingRealtimeTableDataManager.java
@@ -27,6 +27,7 @@ import org.apache.pinot.common.utils.LLCSegmentName;
import org.apache.pinot.core.data.manager.realtime.ConsumerCoordinator;
import org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
+import
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
import org.apache.pinot.segment.local.dedup.PartitionDedupMetadataManager;
import org.apache.pinot.segment.local.segment.index.loader.IndexLoadingConfig;
import org.apache.pinot.segment.local.upsert.PartitionUpsertMetadataManager;
@@ -51,6 +52,15 @@ public class FailureInjectingRealtimeTableDataManager
extends RealtimeTableDataM
_failureInjectingTableConfig = failureInjectingTableConfig;
}
+ public FailureInjectingRealtimeTableDataManager(Semaphore
segmentBuildSemaphore,
+ BooleanSupplier isServerReadyToConsumeData, BooleanSupplier
isServerReadyToServeQueries,
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverIngestionOomProtectionThrottleState,
+ @Nullable FailureInjectingTableConfig failureInjectingTableConfig) {
+ super(segmentBuildSemaphore, isServerReadyToConsumeData,
isServerReadyToServeQueries,
+ serverIngestionOomProtectionThrottleState);
+ _failureInjectingTableConfig = failureInjectingTableConfig;
+ }
+
@Override
protected RealtimeSegmentDataManager
createRealtimeSegmentDataManager(SegmentZKMetadata zkMetadata,
TableConfig tableConfig, IndexLoadingConfig indexLoadingConfig, Schema
schema, LLCSegmentName llcSegmentName,
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
index d73bcb38260..98ac676c922 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/realtime/utils/FailureInjectingTableDataManagerProvider.java
@@ -33,6 +33,7 @@ import
org.apache.pinot.common.restlet.resources.SegmentErrorInfo;
import org.apache.pinot.core.data.manager.offline.DimensionTableDataManager;
import org.apache.pinot.core.data.manager.offline.OfflineTableDataManager;
import org.apache.pinot.core.data.manager.provider.TableDataManagerProvider;
+import
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.local.utils.SegmentOperationsThrottlerSet;
@@ -84,6 +85,7 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
@Nullable Cache<Pair<String, String>, SegmentErrorInfo> errorCache,
BooleanSupplier isServerReadyToConsumeData,
BooleanSupplier isServerReadyToServeQueries,
+ ServerIngestionOomProtectionManager.ServerThrottleState
serverIngestionOomProtectionThrottleState,
boolean enableAsyncSegmentRefresh,
ServerReloadJobStatusCache reloadJobStatusCache) {
TableDataManager tableDataManager;
@@ -116,8 +118,8 @@ public class FailureInjectingTableDataManagerProvider
implements TableDataManage
}
}
tableDataManager =
- new
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore,
isServerReadyToServeQueries,
- failureInjectingTableConfig);
+ new
FailureInjectingRealtimeTableDataManager(_segmentBuildSemaphore,
isServerReadyToConsumeData,
+ isServerReadyToServeQueries,
serverIngestionOomProtectionThrottleState, failureInjectingTableConfig);
break;
default:
throw new IllegalStateException();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
index 6acc2a2b422..ff15a2fd0f4 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/TableConfigUtils.java
@@ -486,11 +486,12 @@ public final class TableConfigUtils {
IndexingConfig indexingConfig = tableConfig.getIndexingConfig();
Preconditions.checkState(indexingConfig == null ||
MapUtils.isEmpty(indexingConfig.getStreamConfigs()),
"Should not use indexingConfig#getStreamConfigs if
ingestionConfig#StreamIngestionConfig is provided");
- List<Map<String, String>> streamConfigMaps =
ingestionConfig.getStreamIngestionConfig().getStreamConfigMaps();
+ StreamIngestionConfig streamIngestionConfig =
ingestionConfig.getStreamIngestionConfig();
+ List<Map<String, String>> streamConfigMaps =
streamIngestionConfig.getStreamConfigMaps();
Preconditions.checkState(!streamConfigMaps.isEmpty(), "Must have at
least 1 stream in REALTIME table");
// TODO: for multiple stream configs, validate them
- boolean isPauselessEnabled =
ingestionConfig.getStreamIngestionConfig().isPauselessConsumptionEnabled();
+ boolean isPauselessEnabled =
streamIngestionConfig.isPauselessConsumptionEnabled();
if (isPauselessEnabled) {
int replication = tableConfig.getReplication();
// We are checking for this only when replication is greater than 1
because in test environments
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
index c7cab544ed3..888c2de86d5 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/utils/TableConfigUtilsTest.java
@@ -963,6 +963,31 @@ public class TableConfigUtilsTest {
}
}
+ @Test
+ public void ingestionServerIngestionOomProtectionTest() {
+ Schema schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
+ StreamIngestionConfig streamIngestionConfig =
+ new
StreamIngestionConfig(Collections.singletonList(getStreamConfigs()));
+ IngestionConfig ingestionConfig = new IngestionConfig();
+ ingestionConfig.setStreamIngestionConfig(streamIngestionConfig);
+ TableConfig tableConfig = new
TableConfigBuilder(TableType.REALTIME).setTableName(TABLE_NAME)
+ .setIngestionConfig(ingestionConfig)
+ .build();
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+ streamIngestionConfig.setOomProtection(Enablement.ENABLE);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+ streamIngestionConfig.setOomProtection(Enablement.DISABLE);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+ streamIngestionConfig.setOomProtection(Enablement.DEFAULT);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+
+ streamIngestionConfig.setOomProtection(null);
+ TableConfigUtils.validateIngestionConfig(tableConfig, schema);
+ }
+
@Test
public void ingestionBatchConfigsTest() {
Schema schema = new
Schema.SchemaBuilder().setSchemaName(TABLE_NAME).build();
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index b22f220a4df..cfbbc349c88 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -795,6 +795,10 @@ public abstract class BaseServerStarter implements
ServiceStartable {
PinotClusterConfigChangeListener serverRateLimitConfigChangeListener =
new ServerRateLimitConfigChangeListener(_serverMetrics);
_clusterConfigChangeHandler.registerClusterConfigChangeListener(serverRateLimitConfigChangeListener);
+ if (instanceDataManager instanceof HelixInstanceDataManager) {
+ _clusterConfigChangeHandler.registerClusterConfigChangeListener(
+ ((HelixInstanceDataManager)
instanceDataManager).getServerIngestionOomProtectionThrottleState());
+ }
// Register query killing manager for dynamic config updates
(threshold/mode changes via ZK)
if (_queryKillingManager != null) {
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
index c55adf58c4c..04cc89a7723 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/HelixInstanceDataManager.java
@@ -57,6 +57,7 @@ import
org.apache.pinot.core.data.manager.realtime.RealtimeSegmentDataManager;
import org.apache.pinot.core.data.manager.realtime.RealtimeTableDataManager;
import
org.apache.pinot.core.data.manager.realtime.SegmentBuildTimeLeaseExtender;
import org.apache.pinot.core.data.manager.realtime.SegmentUploader;
+import
org.apache.pinot.core.data.manager.realtime.ServerIngestionOomProtectionManager;
import org.apache.pinot.segment.local.data.manager.SegmentDataManager;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
import org.apache.pinot.segment.local.utils.SegmentLocks;
@@ -105,6 +106,7 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
private SegmentUploader _segmentUploader;
private BooleanSupplier _isServerReadyToConsumeData = () -> false;
private BooleanSupplier _isServerReadyToServeQueries = () -> false;
+ private ServerIngestionOomProtectionManager.ServerThrottleState
_serverIngestionOomProtectionThrottleState;
// Fixed size LRU cache for storing last N errors on the instance.
// Key is TableNameWithType-SegmentName pair.
@@ -143,6 +145,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
_instanceId = _instanceDataManagerConfig.getInstanceId();
_helixManager = helixManager;
_reloadJobStatusCache = requireNonNull(reloadJobStatusCache,
"reloadJobStatusCache cannot be null");
+ _serverIngestionOomProtectionThrottleState =
+ ServerIngestionOomProtectionManager.createServerThrottleState(config,
serverMetrics);
String tableDataManagerProviderClass =
_instanceDataManagerConfig.getTableDataManagerProviderClass();
LOGGER.info("Initializing table data manager provider of class: {}",
tableDataManagerProviderClass);
_tableDataManagerProvider =
PluginManager.get().createInstance(tableDataManagerProviderClass);
@@ -186,6 +190,10 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
.expireAfterWrite(_instanceDataManagerConfig.getDeletedTablesCacheTtlMinutes(),
TimeUnit.MINUTES).build();
}
+ ServerIngestionOomProtectionManager.ServerThrottleState
getServerIngestionOomProtectionThrottleState() {
+ return _serverIngestionOomProtectionThrottleState;
+ }
+
@VisibleForTesting
void initInstanceDataDir(File instanceDataDir) {
if (!instanceDataDir.exists()) {
@@ -361,7 +369,8 @@ public class HelixInstanceDataManager implements
InstanceDataManager {
TableDataManager tableDataManager =
_tableDataManagerProvider.getTableDataManager(tableConfig, schema,
_segmentReloadSemaphore,
_segmentReloadRefreshExecutor, _segmentPreloadExecutor,
_errorCache, _isServerReadyToConsumeData,
- _isServerReadyToServeQueries, _enableAsyncSegmentRefresh,
_reloadJobStatusCache);
+ _isServerReadyToServeQueries,
_serverIngestionOomProtectionThrottleState, _enableAsyncSegmentRefresh,
+ _reloadJobStatusCache);
tableDataManager.start();
LOGGER.info("Created table data manager for table: {}", tableNameWithType);
return tableDataManager;
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
index 2cc00498b0c..d6507061060 100644
---
a/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
+++
b/pinot-spi/src/main/java/org/apache/pinot/spi/config/table/ingestion/StreamIngestionConfig.java
@@ -26,6 +26,7 @@ import java.util.Map;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.BaseJsonConfig;
import org.apache.pinot.spi.config.table.DisasterRecoveryMode;
+import org.apache.pinot.spi.utils.Enablement;
/**
@@ -66,6 +67,11 @@ public class StreamIngestionConfig extends BaseJsonConfig {
+ " partition during realtime ingestion. Defaults to false.")
private boolean _dropRecordOnPartitionMismatch;
+ @JsonPropertyDescription("Optional table-level enablement override for
server-side ingestion OOM protection. "
+ + "Supported values are ENABLE, DISABLE and DEFAULT. If unset or
DEFAULT, the table follows the server-level "
+ + "mode.")
+ private Enablement _oomProtection = Enablement.DEFAULT;
+
@JsonCreator
public StreamIngestionConfig(@JsonProperty("streamConfigMaps")
List<Map<String, String>> streamConfigMaps) {
_streamConfigMaps = streamConfigMaps;
@@ -148,4 +154,12 @@ public class StreamIngestionConfig extends BaseJsonConfig {
public void setDropRecordOnPartitionMismatch(boolean
dropRecordOnPartitionMismatch) {
_dropRecordOnPartitionMismatch = dropRecordOnPartitionMismatch;
}
+
+ public Enablement getOomProtection() {
+ return _oomProtection == null ? Enablement.DEFAULT : _oomProtection;
+ }
+
+ public void setOomProtection(@Nullable Enablement oomProtection) {
+ _oomProtection = oomProtection == null ? Enablement.DEFAULT :
oomProtection;
+ }
}
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 3b2d69a33b5..33f94493840 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -1579,6 +1579,24 @@ public class CommonConstants {
// Default to 0.0 (no limit)
public static final double DEFAULT_SERVER_CONSUMPTION_RATE_LIMIT = 0.0;
+ // Configs for server-side realtime ingestion OOM protection. These are
consumed from the instance data manager
+ // config subset, so the user-facing property prefix is
pinot.server.instance.
+ public static final String CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_MODE =
+ "ingestion.oom.protection.mode";
+ public static final String DEFAULT_SERVER_INGESTION_OOM_PROTECTION_MODE =
"DISABLE";
+ public static final String
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD =
+ "ingestion.oom.protection.heapUsageThrottleThreshold";
+ public static final double
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THROTTLE_THRESHOLD = 0.95;
+ public static final String
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD =
+ "ingestion.oom.protection.heapUsageRecoveryThreshold";
+ public static final double
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_RECOVERY_THRESHOLD = 0.90;
+ public static final String
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS =
+ "ingestion.oom.protection.checkIntervalMs";
+ public static final long
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS = 1_000L;
+ public static final String
CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS =
+ "ingestion.oom.protection.gcIntervalMs";
+ public static final long
DEFAULT_SERVER_INGESTION_OOM_PROTECTION_GC_INTERVAL_MS = 30_000L;
+
public static final String CONFIG_OF_MMAP_DEFAULT_ADVICE =
"pinot.server.mmap.advice.default";
public static final String PREFIX_OF_CONFIG_OF_SEGMENT_FETCHER_FACTORY =
"pinot.server.segment.fetcher";
diff --git
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md
new file mode 100644
index 00000000000..106c1dfab54
--- /dev/null
+++ b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/README.md
@@ -0,0 +1,66 @@
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing,
+ software distributed under the License is distributed on an
+ "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ KIND, either express or implied. See the License for the
+ specific language governing permissions and limitations
+ under the License.
+
+-->
+
+# Server ingestion OOM protection
+
+Server ingestion OOM protection applies local backpressure to realtime
ingestion when the server JVM heap is above the
+configured threshold. It is disabled by default. Set the server mode to
`UPSERT_DEDUP_ONLY` to protect upsert and dedup
+realtime tables, or `ENABLE` to protect all realtime tables. When active, the
consuming loop waits and checks heap usage
+again at the configured interval, then resumes ingestion after heap usage
reaches the recovery threshold. While
+throttled, Pinot also requests JVM garbage collection at a rate-limited
interval so a mostly ingestion-only server does
+not stay paused forever with reclaimable garbage still counted as used heap.
+
+Server-level properties are set on the server instance config:
+
+```properties
+pinot.server.instance.ingestion.oom.protection.mode=UPSERT_DEDUP_ONLY
+pinot.server.instance.ingestion.oom.protection.heapUsageThrottleThreshold=0.95
+pinot.server.instance.ingestion.oom.protection.heapUsageRecoveryThreshold=0.90
+pinot.server.instance.ingestion.oom.protection.checkIntervalMs=1000
+pinot.server.instance.ingestion.oom.protection.gcIntervalMs=30000
+```
+
+The same server properties can be set in the Pinot cluster config and updated
dynamically at runtime. Runtime cluster
+config changes use the same full `pinot.server.instance.*` property names:
+
+```json
+{
+ "pinot.server.instance.ingestion.oom.protection.mode": "UPSERT_DEDUP_ONLY",
+ "pinot.server.instance.ingestion.oom.protection.heapUsageThrottleThreshold":
"0.95",
+ "pinot.server.instance.ingestion.oom.protection.heapUsageRecoveryThreshold":
"0.90",
+ "pinot.server.instance.ingestion.oom.protection.checkIntervalMs": "1000",
+ "pinot.server.instance.ingestion.oom.protection.gcIntervalMs": "30000"
+}
+```
+
+Use `pinot.server.instance.ingestion.oom.protection.mode=ENABLE` to apply the
server-level policy to all realtime tables.
+Use `pinot.server.instance.ingestion.oom.protection.mode=DISABLE` to disable
the server-level policy. Set
+`pinot.server.instance.ingestion.oom.protection.gcIntervalMs=0` to disable the
explicit GC request while throttled.
+
+Each realtime table can override the server policy under
`ingestionConfig.streamIngestionConfig`:
+
+```json
+"oomProtection": "ENABLE"
+```
+
+If table `oomProtection` is unset or `DEFAULT`, the table follows the server
mode. Set it to `ENABLE` to
+protect this table even when the server mode is `DISABLE` or would otherwise
skip it, or `DISABLE` to turn protection
+off for this table. Thresholds are configured at the server level only.
diff --git
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
index 1d984b7129f..d5b9c62aaec 100644
---
a/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
+++
b/pinot-tools/src/main/resources/examples/stream/upsertMeetupRsvp/upsertMeetupRsvp_realtime_table_config.json
@@ -50,7 +50,8 @@
"stream.kafka.zk.broker.url": "localhost:2191/kafka",
"stream.kafka.broker.list": "localhost:19092"
}
- ]
+ ],
+ "oomProtection": "ENABLE"
}
},
"upsertConfig": {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]