Copilot commented on code in PR #18784:
URL: https://github.com/apache/pinot/pull/18784#discussion_r3425345156


##########
pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/ServerIngestionOomProtectionManager.java:
##########
@@ -0,0 +1,258 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.data.manager.realtime;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.util.Locale;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BooleanSupplier;
+import java.util.function.LongSupplier;
+import java.util.function.Supplier;
+import javax.annotation.Nullable;
+import org.apache.pinot.common.metrics.ServerGauge;
+import org.apache.pinot.common.metrics.ServerMeter;
+import org.apache.pinot.common.metrics.ServerMetrics;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import 
org.apache.pinot.spi.config.table.ingestion.ServerIngestionOomProtectionConfig;
+import org.apache.pinot.spi.config.table.ingestion.StreamIngestionConfig;
+import org.apache.pinot.spi.env.PinotConfiguration;
+import org.apache.pinot.spi.utils.CommonConstants;
+import org.apache.pinot.spi.utils.Enablement;
+import org.apache.pinot.spi.utils.ResourceUsageUtils;
+import org.apache.pinot.spi.utils.builder.TableNameBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/// Applies server-local backpressure to realtime ingestion while JVM heap 
usage is above a configured threshold.
+public class ServerIngestionOomProtectionManager {
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ServerIngestionOomProtectionManager.class);
+  private static final long ACTIVE_LOG_INTERVAL_MS = 
TimeUnit.MINUTES.toMillis(1);
+
+  enum TableSelectionMode {
+    UPSERT_ONLY, ALL_REALTIME
+  }
+
+  private final String _tableNameWithType;
+  private final Supplier<TableConfig> _tableConfigSupplier;
+  private final BooleanSupplier _isUpsertEnabledSupplier;
+  private final ServerMetrics _serverMetrics;
+  private final boolean _serverEnabled;
+  private final TableSelectionMode _tableSelectionMode;
+  private final double _serverHeapUsageThreshold;
+  private final double _serverHeapRecoveryThreshold;
+  private final long _checkIntervalMs;
+  private final LongSupplier _usedHeapSupplier;
+  private final LongSupplier _maxHeapSupplier;
+  private final LongSupplier _currentTimeMsSupplier;
+
+  private volatile boolean _throttling;
+  private volatile long _lastCheckTimeMs = -1L;
+  private volatile boolean _loggedInvalidHeapSize;
+  private volatile boolean _loggedInvalidThresholds;
+  private volatile long _lastActiveLogTimeMs = Long.MIN_VALUE;
+
+  public ServerIngestionOomProtectionManager(String tableNameWithType,
+      @Nullable PinotConfiguration instanceDataManagerConfig, 
Supplier<TableConfig> tableConfigSupplier,
+      BooleanSupplier isUpsertEnabledSupplier, ServerMetrics serverMetrics) {
+    this(tableNameWithType, instanceDataManagerConfig, tableConfigSupplier, 
isUpsertEnabledSupplier, serverMetrics,
+        ResourceUsageUtils::getUsedHeapSize, 
ResourceUsageUtils::getMaxHeapSize, System::currentTimeMillis);
+  }
+
+  @VisibleForTesting
+  ServerIngestionOomProtectionManager(String tableNameWithType,
+      @Nullable PinotConfiguration instanceDataManagerConfig, 
Supplier<TableConfig> tableConfigSupplier,
+      BooleanSupplier isUpsertEnabledSupplier, ServerMetrics serverMetrics, 
LongSupplier usedHeapSupplier,
+      LongSupplier maxHeapSupplier, LongSupplier currentTimeMsSupplier) {
+    PinotConfiguration config =
+        instanceDataManagerConfig != null ? instanceDataManagerConfig : new 
PinotConfiguration();
+    _tableNameWithType = tableNameWithType;
+    _tableConfigSupplier = tableConfigSupplier;
+    _isUpsertEnabledSupplier = isUpsertEnabledSupplier;
+    _serverMetrics = serverMetrics;
+    _serverEnabled = config.getProperty(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_ENABLED,
+        
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_ENABLED);
+    _tableSelectionMode = getTableSelectionMode(config.getProperty(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_TABLE_SELECTION_MODE,
+        
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_TABLE_SELECTION_MODE));
+    _serverHeapUsageThreshold = config.getProperty(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THRESHOLD,
+        
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_USAGE_THRESHOLD);
+    _serverHeapRecoveryThreshold = config.getProperty(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_HEAP_RECOVERY_THRESHOLD,
+        
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_HEAP_RECOVERY_THRESHOLD);
+    long checkIntervalMs = config.getProperty(
+        
CommonConstants.Server.CONFIG_OF_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS,
+        
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS);
+    _checkIntervalMs = checkIntervalMs > 0 ? checkIntervalMs
+        : 
CommonConstants.Server.DEFAULT_SERVER_INGESTION_OOM_PROTECTION_CHECK_INTERVAL_MS;
+    _usedHeapSupplier = usedHeapSupplier;
+    _maxHeapSupplier = maxHeapSupplier;
+    _currentTimeMsSupplier = currentTimeMsSupplier;
+  }
+
+  public boolean waitIfProtectionNeeded(BooleanSupplier stopCondition)
+      throws InterruptedException {
+    boolean waited = false;
+    while (!stopCondition.getAsBoolean() && shouldThrottle()) {
+      waited = true;
+      _serverMetrics.addMeteredTableValue(_tableNameWithType,
+          ServerMeter.REALTIME_INGESTION_OOM_PROTECTION_THROTTLED, 1L);
+      Thread.sleep(_checkIntervalMs);
+    }
+    return waited;
+  }
+
+  @VisibleForTesting
+  boolean shouldThrottle() {
+    long nowMs = _currentTimeMsSupplier.getAsLong();
+    if (isWithinCheckInterval(nowMs)) {
+      return _throttling;
+    }
+    synchronized (this) {
+      nowMs = _currentTimeMsSupplier.getAsLong();
+      if (isWithinCheckInterval(nowMs)) {
+        return _throttling;
+      }
+      _lastCheckTimeMs = nowMs;
+      return sampleAndUpdate(nowMs);
+    }
+  }
+
+  @VisibleForTesting
+  boolean isThrottling() {
+    return _throttling;
+  }
+
+  private boolean sampleAndUpdate(long nowMs) {
+    TableConfig tableConfig = _tableConfigSupplier.get();
+    if (!isEnabledForTable(tableConfig)) {
+      updateThrottling(false, 0.0, 0.0, 0.0, nowMs);
+      return false;
+    }
+
+    ServerIngestionOomProtectionConfig tableConfigOverride = 
getConfig(tableConfig);
+    double heapUsageThreshold =
+        tableConfigOverride != null && 
tableConfigOverride.getHeapUsageThreshold() != null
+            ? tableConfigOverride.getHeapUsageThreshold() : 
_serverHeapUsageThreshold;
+    double heapRecoveryThreshold =
+        tableConfigOverride != null && 
tableConfigOverride.getHeapRecoveryThreshold() != null
+            ? tableConfigOverride.getHeapRecoveryThreshold() : 
_serverHeapRecoveryThreshold;
+    if (!isValidThresholdConfig(heapUsageThreshold, heapRecoveryThreshold)) {
+      if (!_loggedInvalidThresholds) {
+        LOGGER.warn("Disabling server ingestion OOM protection for table: {} 
because thresholds are invalid. "
+                + "heapUsageThreshold: {}, heapRecoveryThreshold: {}", 
_tableNameWithType, heapUsageThreshold,
+            heapRecoveryThreshold);
+        _loggedInvalidThresholds = true;
+      }
+      updateThrottling(false, 0.0, heapUsageThreshold, heapRecoveryThreshold, 
nowMs);
+      return false;
+    }
+
+    long maxHeapBytes = _maxHeapSupplier.getAsLong();
+    if (maxHeapBytes <= 0) {
+      if (!_loggedInvalidHeapSize) {
+        LOGGER.warn("Disabling server ingestion OOM protection for table: {} 
because max heap size is not "
+            + "available: {}", _tableNameWithType, maxHeapBytes);
+        _loggedInvalidHeapSize = true;
+      }
+      updateThrottling(false, 0.0, heapUsageThreshold, heapRecoveryThreshold, 
nowMs);
+      return false;
+    }
+
+    double heapUsageRatio = Math.max(0.0, (double) 
_usedHeapSupplier.getAsLong() / maxHeapBytes);
+    boolean shouldThrottle = _throttling ? heapUsageRatio > 
heapRecoveryThreshold
+        : heapUsageRatio >= heapUsageThreshold;
+    updateThrottling(shouldThrottle, heapUsageRatio, heapUsageThreshold, 
heapRecoveryThreshold, nowMs);
+    return shouldThrottle;
+  }
+
+  void resetMetrics() {
+    updateThrottling(false, 0.0, 0.0, 0.0, _currentTimeMsSupplier.getAsLong());
+  }
+
+  private boolean isEnabledForTable(@Nullable TableConfig tableConfig) {
+    if (!_serverEnabled || tableConfig == null
+        || 
!TableNameBuilder.isRealtimeTableResource(tableConfig.getTableName())) {
+      return false;
+    }
+    ServerIngestionOomProtectionConfig config = getConfig(tableConfig);
+    Enablement mode = config != null ? config.getMode() : Enablement.DEFAULT;
+    return mode.isEnabled(() -> _tableSelectionMode == 
TableSelectionMode.ALL_REALTIME
+        || (_tableSelectionMode == TableSelectionMode.UPSERT_ONLY && 
_isUpsertEnabledSupplier.getAsBoolean()));
+  }
+
+  @Nullable
+  private static ServerIngestionOomProtectionConfig getConfig(@Nullable 
TableConfig tableConfig) {
+    if (tableConfig == null) {
+      return null;
+    }
+    IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+    if (ingestionConfig == null) {
+      return null;
+    }
+    StreamIngestionConfig streamIngestionConfig = 
ingestionConfig.getStreamIngestionConfig();
+    return streamIngestionConfig != null ? 
streamIngestionConfig.getServerIngestionOomProtectionConfig() : null;
+  }
+
+  private void updateThrottling(boolean shouldThrottle, double heapUsageRatio, 
double heapUsageThreshold,
+      double heapRecoveryThreshold, long nowMs) {
+    boolean wasThrottling = _throttling;
+    _throttling = shouldThrottle;
+    _serverMetrics.setValueOfTableGauge(_tableNameWithType,
+        ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_HEAP_USAGE_PERCENT, 
Math.round(heapUsageRatio * 100));
+    _serverMetrics.setValueOfTableGauge(_tableNameWithType, 
ServerGauge.REALTIME_INGESTION_OOM_PROTECTION_ACTIVE,
+        shouldThrottle ? 1L : 0L);
+    if (shouldThrottle && !wasThrottling) {
+      _lastActiveLogTimeMs = nowMs;
+      LOGGER.warn("Server ingestion OOM protection activated for table: {}. 
Heap usage: {}%, threshold: {}%, "
+              + "recovery threshold: {}%", _tableNameWithType, 
Math.round(heapUsageRatio * 100),
+          Math.round(heapUsageThreshold * 100), 
Math.round(heapRecoveryThreshold * 100));
+    } else if (shouldThrottle && nowMs - _lastActiveLogTimeMs >= 
ACTIVE_LOG_INTERVAL_MS) {
+      _lastActiveLogTimeMs = nowMs;
+      LOGGER.warn("Server ingestion OOM protection remains active for table: 
{}. Heap usage: {}%, recovery "
+              + "threshold: {}%", _tableNameWithType, 
Math.round(heapUsageRatio * 100),
+          Math.round(heapRecoveryThreshold * 100));
+    } else if (!shouldThrottle && wasThrottling) {
+      LOGGER.info("Server ingestion OOM protection released for table: {}. 
Heap usage: {}%, recovery threshold: {}%",
+          _tableNameWithType, Math.round(heapUsageRatio * 100), 
Math.round(heapRecoveryThreshold * 100));
+    }

Review Comment:
   When OOM protection is disabled for a table (or `resetMetrics()` is called), 
`updateThrottling(false, ..., heapRecoveryThreshold=0.0, ...)` can transition 
from throttling->not throttling and emit a "released" log line that reports a 
recovery threshold of `0%`, which is misleading (it wasn’t a real configured 
threshold). Consider omitting the recovery-threshold value when it’s not 
meaningful (<= 0) to avoid confusing operators.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to