This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 763c732d4f Make SegmentOperationsThrottler more extensible and modify
interfaces for upsert and dedup to take this as an argument (#15973)
763c732d4f is described below
commit 763c732d4fed14c5429ef99785ee47e0b0815319
Author: Sonam Mandal <[email protected]>
AuthorDate: Tue Jun 10 10:50:36 2025 -0700
Make SegmentOperationsThrottler more extensible and modify interfaces for
upsert and dedup to take this as an argument (#15973)
---
.../manager/realtime/RealtimeTableDataManager.java | 4 +-
.../local/dedup/BaseTableDedupMetadataManager.java | 6 +-
.../local/dedup/TableDedupMetadataManager.java | 4 +-
.../dedup/TableDedupMetadataManagerFactory.java | 7 +-
.../upsert/BaseTableUpsertMetadataManager.java | 6 +-
.../local/upsert/TableUpsertMetadataManager.java | 4 +-
.../upsert/TableUpsertMetadataManagerFactory.java | 5 +-
.../utils/BaseSegmentOperationsThrottler.java | 47 ++++++++-----
.../utils/SegmentAllIndexPreprocessThrottler.java | 13 +++-
.../local/utils/SegmentDownloadThrottler.java | 17 ++---
.../utils/SegmentStarTreePreprocessThrottler.java | 11 ++-
.../TableDedupMetadataManagerFactoryTest.java | 2 +-
.../mutable/MutableSegmentDedupTest.java | 3 +-
.../MutableSegmentImplUpsertComparisonColTest.java | 3 +-
.../mutable/MutableSegmentImplUpsertTest.java | 2 +-
.../TableUpsertMetadataManagerFactoryTest.java | 14 ++--
.../server/starter/helix/BaseServerStarter.java | 79 +++++++++++++---------
.../apache/pinot/spi/utils/CommonConstants.java | 3 +
18 files changed, 149 insertions(+), 81 deletions(-)
diff --git
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
index 15ebdc56eb..004755f47c 100644
---
a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
+++
b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeTableDataManager.java
@@ -207,14 +207,14 @@ public class RealtimeTableDataManager extends
BaseTableDataManager {
if (tableConfig.isDedupEnabled()) {
_tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(_instanceDataManagerConfig.getDedupConfig(),
tableConfig, schema,
- this);
+ this, _segmentOperationsThrottler);
}
if (tableConfig.isUpsertEnabled()) {
Preconditions.checkState(_tableDedupMetadataManager == null,
"Dedup and upsert cannot be both enabled for table: %s",
_tableNameWithType);
_tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(_instanceDataManagerConfig.getUpsertConfig(),
tableConfig, schema,
- this);
+ this, _segmentOperationsThrottler);
}
_enforceConsumptionInOrder = isEnforceConsumptionInOrder();
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
index e2c8f986e2..61988fd4ad 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/BaseTableDedupMetadataManager.java
@@ -23,8 +23,10 @@ import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import javax.annotation.Nullable;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -40,11 +42,13 @@ public abstract class BaseTableDedupMetadataManager
implements TableDedupMetadat
protected final Map<Integer, PartitionDedupMetadataManager>
_partitionMetadataManagerMap = new ConcurrentHashMap<>();
protected String _tableNameWithType;
protected DedupContext _context;
+ protected SegmentOperationsThrottler _segmentOperationsThrottler;
@Override
public void init(PinotConfiguration instanceDedupConfig, TableConfig
tableConfig, Schema schema,
- TableDataManager tableDataManager) {
+ TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler
segmentOperationsThrottler) {
_tableNameWithType = tableConfig.getTableName();
+ _segmentOperationsThrottler = segmentOperationsThrottler;
Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be
enabled for table: %s",
_tableNameWithType);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
index edc971bea2..03007c9034 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManager.java
@@ -19,7 +19,9 @@
package org.apache.pinot.segment.local.dedup;
import java.io.Closeable;
+import javax.annotation.Nullable;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.spi.env.PinotConfiguration;
@@ -30,7 +32,7 @@ public interface TableDedupMetadataManager extends Closeable {
* Initialize TableDedupMetadataManager.
*/
void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig,
Schema schema,
- TableDataManager tableDataManager);
+ TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler
segmentOperationsThrottler);
/**
* Create a new PartitionDedupMetadataManager if not present already,
otherwise return existing one.
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
index 473da5e392..ae34d71464 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactory.java
@@ -19,8 +19,10 @@
package org.apache.pinot.segment.local.dedup;
import com.google.common.base.Preconditions;
+import javax.annotation.Nullable;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.config.table.DedupConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.data.Schema;
@@ -37,7 +39,8 @@ public class TableDedupMetadataManagerFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableDedupMetadataManagerFactory.class);
public static TableDedupMetadataManager create(PinotConfiguration
instanceDedupConfig, TableConfig tableConfig,
- Schema schema, TableDataManager tableDataManager) {
+ Schema schema, TableDataManager tableDataManager,
+ @Nullable SegmentOperationsThrottler segmentOperationsThrottler) {
String tableNameWithType = tableConfig.getTableName();
Preconditions.checkArgument(tableConfig.isDedupEnabled(), "Dedup must be
enabled for table: %s", tableNameWithType);
DedupConfig dedupConfig = tableConfig.getDedupConfig();
@@ -63,7 +66,7 @@ public class TableDedupMetadataManagerFactory {
LOGGER.info("Creating ConcurrentMapTableDedupMetadataManager for table:
{}", tableNameWithType);
metadataManager = new ConcurrentMapTableDedupMetadataManager();
}
- metadataManager.init(instanceDedupConfig, tableConfig, schema,
tableDataManager);
+ metadataManager.init(instanceDedupConfig, tableConfig, schema,
tableDataManager, segmentOperationsThrottler);
return metadataManager;
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
index 8d2318148e..539ae1a95c 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/BaseTableUpsertMetadataManager.java
@@ -20,9 +20,11 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
import java.util.List;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
@@ -38,11 +40,13 @@ public abstract class BaseTableUpsertMetadataManager
implements TableUpsertMetad
protected String _tableNameWithType;
protected UpsertContext _context;
+ protected SegmentOperationsThrottler _segmentOperationsThrottler;
@Override
public void init(PinotConfiguration instanceUpsertConfig, TableConfig
tableConfig, Schema schema,
- TableDataManager tableDataManager) {
+ TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler
segmentOperationsThrottler) {
_tableNameWithType = tableConfig.getTableName();
+ _segmentOperationsThrottler = segmentOperationsThrottler;
Preconditions.checkArgument(tableConfig.isUpsertEnabled(),
"Upsert must be enabled for table: %s", _tableNameWithType);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
index e4071fa08c..3a8012c539 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManager.java
@@ -23,8 +23,10 @@ import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import javax.annotation.Nullable;
import javax.annotation.concurrent.ThreadSafe;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.segment.spi.SegmentContext;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
@@ -39,7 +41,7 @@ import org.apache.pinot.spi.env.PinotConfiguration;
public interface TableUpsertMetadataManager extends Closeable {
void init(PinotConfiguration instanceUpsertConfig, TableConfig tableConfig,
Schema schema,
- TableDataManager tableDataManager);
+ TableDataManager tableDataManager, @Nullable SegmentOperationsThrottler
segmentOperationsThrottler);
PartitionUpsertMetadataManager getOrCreatePartitionManager(int partitionId);
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
index 8d81b92588..1151220098 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactory.java
@@ -21,6 +21,7 @@ package org.apache.pinot.segment.local.upsert;
import com.google.common.base.Preconditions;
import org.apache.commons.lang3.StringUtils;
import org.apache.pinot.segment.local.data.manager.TableDataManager;
+import org.apache.pinot.segment.local.utils.SegmentOperationsThrottler;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.UpsertConfig;
import org.apache.pinot.spi.data.Schema;
@@ -37,7 +38,7 @@ public class TableUpsertMetadataManagerFactory {
private static final Logger LOGGER =
LoggerFactory.getLogger(TableUpsertMetadataManagerFactory.class);
public static TableUpsertMetadataManager create(PinotConfiguration
instanceUpsertConfig, TableConfig tableConfig,
- Schema schema, TableDataManager tableDataManager) {
+ Schema schema, TableDataManager tableDataManager,
SegmentOperationsThrottler segmentOperationsThrottler) {
String tableNameWithType = tableConfig.getTableName();
Preconditions.checkArgument(tableConfig.isUpsertEnabled(), "Upsert must be
enabled for table: %s",
tableNameWithType);
@@ -64,7 +65,7 @@ public class TableUpsertMetadataManagerFactory {
LOGGER.info("Creating ConcurrentMapTableUpsertMetadataManager for table:
{}", tableNameWithType);
metadataManager = new ConcurrentMapTableUpsertMetadataManager();
}
- metadataManager.init(instanceUpsertConfig, tableConfig, schema,
tableDataManager);
+ metadataManager.init(instanceUpsertConfig, tableConfig, schema,
tableDataManager, segmentOperationsThrottler);
return metadataManager;
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
index 4e8de0cdf8..717e19aab8 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/BaseSegmentOperationsThrottler.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.common.metrics.ServerGauge;
import org.apache.pinot.common.metrics.ServerMetrics;
import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
import org.slf4j.Logger;
@@ -45,8 +44,6 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
protected int _maxConcurrency;
protected int _maxConcurrencyBeforeServingQueries;
protected boolean _isServingQueries;
- protected ServerGauge _thresholdGauge;
- protected ServerGauge _countGauge;
private AtomicInteger _numSegmentsAcquiredSemaphore;
private final Logger _logger;
@@ -55,12 +52,10 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
* @param maxConcurrency configured concurrency
* @param maxConcurrencyBeforeServingQueries configured concurrency before
serving queries
* @param isServingQueries whether the server is ready to serve queries or
not
- * @param thresholdGauge gauge metric to track the throttle thresholds
- * @param countGauge gauge metric to track the number of segments undergoing
the given operation
* @param logger logger to use
*/
public BaseSegmentOperationsThrottler(int maxConcurrency, int
maxConcurrencyBeforeServingQueries,
- boolean isServingQueries, ServerGauge thresholdGauge, ServerGauge
countGauge, Logger logger) {
+ boolean isServingQueries, Logger logger) {
_logger = logger;
_logger.info("Initializing SegmentOperationsThrottler, maxConcurrency: {},
maxConcurrencyBeforeServingQueries: {}, "
+ "isServingQueries: {}",
@@ -72,8 +67,6 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
_maxConcurrency = maxConcurrency;
_maxConcurrencyBeforeServingQueries = maxConcurrencyBeforeServingQueries;
_isServingQueries = isServingQueries;
- _thresholdGauge = thresholdGauge;
- _countGauge = countGauge;
// maxConcurrencyBeforeServingQueries is only used prior to serving
queries and once the server is
// ready to serve queries this is not used again. This too is configurable
via ZK CLUSTER config updates while the
@@ -90,6 +83,18 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
availablePermits());
}
+ /**
+ * Updates the throttle threshold metric
+ * @param value value to update the metric to
+ */
+ public abstract void updateThresholdMetric(int value);
+
+ /**
+ * Updates the throttle count metric
+ * @param value value to update the metric to
+ */
+ public abstract void updateCountMetric(int value);
+
/**
* The ServerMetrics may be created after these throttle objects are
created. In that case, the initialization that
* happens in the constructor may have occurred on the NOOP metrics. This
should be called after the server metrics
@@ -99,8 +104,8 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
*/
public void initializeMetrics() {
_serverMetrics = ServerMetrics.get();
- _serverMetrics.setValueOfGlobalGauge(_thresholdGauge,
_semaphore.getTotalPermits());
- _serverMetrics.setValueOfGlobalGauge(_countGauge, 0);
+ updateThresholdMetric(_semaphore.getTotalPermits());
+ updateCountMetric(0);
}
public synchronized void startServingQueries() {
@@ -108,7 +113,7 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
+ "total permits: {}, available permits: {}", totalPermits(),
availablePermits());
_isServingQueries = true;
_semaphore.setPermits(_maxConcurrency);
- _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrency);
+ updateThresholdMetric(_maxConcurrency);
_logger.info("Reset throttling completed, new concurrency: {}, total
permits: {}, available permits: {}",
_maxConcurrency, totalPermits(), availablePermits());
}
@@ -152,7 +157,7 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
return;
}
_semaphore.setPermits(_maxConcurrency);
- _serverMetrics.setValueOfGlobalGauge(_thresholdGauge, _maxConcurrency);
+ updateThresholdMetric(_maxConcurrency);
_logger.info("Updated total permits: {}", totalPermits());
}
@@ -193,7 +198,7 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
if (!_isServingQueries) {
_logger.info("config: {} was updated before serving queries was enabled,
updating the permits", configName);
_semaphore.setPermits(_maxConcurrencyBeforeServingQueries);
- _serverMetrics.setValueOfGlobalGauge(_thresholdGauge,
_maxConcurrencyBeforeServingQueries);
+ updateThresholdMetric(_maxConcurrencyBeforeServingQueries);
_logger.info("Updated total permits: {}", totalPermits());
}
}
@@ -209,7 +214,7 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
public void acquire()
throws InterruptedException {
_semaphore.acquire();
- _serverMetrics.setValueOfGlobalGauge(_countGauge,
_numSegmentsAcquiredSemaphore.incrementAndGet());
+ updateCountMetric(_numSegmentsAcquiredSemaphore.incrementAndGet());
}
/**
@@ -218,7 +223,15 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
*/
public void release() {
_semaphore.release();
- _serverMetrics.setValueOfGlobalGauge(_countGauge,
_numSegmentsAcquiredSemaphore.decrementAndGet());
+ updateCountMetric(_numSegmentsAcquiredSemaphore.decrementAndGet());
+ }
+
+ /**
+ * Get the estimated number of threads waiting for the semaphore
+ * @return the estimated queue length
+ */
+ public int getQueueLength() {
+ return _semaphore.getQueueLength();
}
/**
@@ -226,7 +239,7 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
* @return number of available permits
*/
@VisibleForTesting
- protected int availablePermits() {
+ public int availablePermits() {
return _semaphore.availablePermits();
}
@@ -235,7 +248,7 @@ public abstract class BaseSegmentOperationsThrottler
implements PinotClusterConf
* @return total number of permits
*/
@VisibleForTesting
- protected int totalPermits() {
+ public int totalPermits() {
return _semaphore.getTotalPermits();
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
index 8a9a5251eb..6243ba5d44 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentAllIndexPreprocessThrottler.java
@@ -40,8 +40,7 @@ public class SegmentAllIndexPreprocessThrottler extends
BaseSegmentOperationsThr
*/
public SegmentAllIndexPreprocessThrottler(int maxPreprocessConcurrency,
int maxPreprocessConcurrencyBeforeServingQueries, boolean
isServingQueries) {
- super(maxPreprocessConcurrency,
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries,
- ServerGauge.SEGMENT_ALL_PREPROCESS_THROTTLE_THRESHOLD,
ServerGauge.SEGMENT_ALL_PREPROCESS_COUNT, LOGGER);
+ super(maxPreprocessConcurrency,
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries, LOGGER);
}
@Override
@@ -60,4 +59,14 @@ public class SegmentAllIndexPreprocessThrottler extends
BaseSegmentOperationsThr
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES);
LOGGER.info("Updated SegmentAllIndexPreprocessThrottler configs with
latest clusterConfigs");
}
+
+ @Override
+ public void updateThresholdMetric(int value) {
+
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_ALL_PREPROCESS_THROTTLE_THRESHOLD,
value);
+ }
+
+ @Override
+ public void updateCountMetric(int value) {
+
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_ALL_PREPROCESS_COUNT,
value);
+ }
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
index 1aec9fbaf7..906c58d013 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentDownloadThrottler.java
@@ -49,8 +49,7 @@ public class SegmentDownloadThrottler extends
BaseSegmentOperationsThrottler {
*/
public SegmentDownloadThrottler(int maxDownloadConcurrency, int
maxDownloadConcurrencyBeforeServingQueries,
boolean isServingQueries) {
- super(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries,
isServingQueries,
- ServerGauge.SEGMENT_DOWNLOAD_THROTTLE_THRESHOLD,
ServerGauge.SEGMENT_DOWNLOAD_COUNT, LOGGER);
+ super(maxDownloadConcurrency, maxDownloadConcurrencyBeforeServingQueries,
isServingQueries, LOGGER);
}
@Override
@@ -70,11 +69,13 @@ public class SegmentDownloadThrottler extends
BaseSegmentOperationsThrottler {
LOGGER.info("Updated SegmentDownloadThrottler configs with latest
clusterConfigs");
}
- /**
- * Get the estimated number of threads waiting for the semaphore
- * @return the estimated queue length
- */
- public int getQueueLength() {
- return _semaphore.getQueueLength();
+ @Override
+ public void updateThresholdMetric(int value) {
+
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_THROTTLE_THRESHOLD,
value);
+ }
+
+ @Override
+ public void updateCountMetric(int value) {
+ _serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_DOWNLOAD_COUNT,
value);
}
}
diff --git
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
index 531e90b44f..cf3e6216d3 100644
---
a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
+++
b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentStarTreePreprocessThrottler.java
@@ -42,7 +42,6 @@ public class SegmentStarTreePreprocessThrottler extends
BaseSegmentOperationsThr
public SegmentStarTreePreprocessThrottler(int
maxStarTreePreprocessConcurrency,
int maxStarTreePreprocessConcurrencyBeforeServingQueries, boolean
isServingQueries) {
super(maxStarTreePreprocessConcurrency,
maxStarTreePreprocessConcurrencyBeforeServingQueries, isServingQueries,
- ServerGauge.SEGMENT_STARTREE_PREPROCESS_THROTTLE_THRESHOLD,
ServerGauge.SEGMENT_STARTREE_PREPROCESS_COUNT,
LOGGER);
}
@@ -62,4 +61,14 @@ public class SegmentStarTreePreprocessThrottler extends
BaseSegmentOperationsThr
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES);
LOGGER.info("Updated SegmentStarTreePreprocessThrottler configs with
latest clusterConfigs");
}
+
+ @Override
+ public void updateThresholdMetric(int value) {
+
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_STARTREE_PREPROCESS_THROTTLE_THRESHOLD,
value);
+ }
+
+ @Override
+ public void updateCountMetric(int value) {
+
_serverMetrics.setValueOfGlobalGauge(ServerGauge.SEGMENT_STARTREE_PREPROCESS_COUNT,
value);
+ }
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
index 7c2f626ff7..02e82df143 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/dedup/TableDedupMetadataManagerFactoryTest.java
@@ -86,7 +86,7 @@ public class TableDedupMetadataManagerFactoryTest {
TableDataManager tableDataManager, boolean expected)
throws IOException {
try (TableDedupMetadataManager tableDedupMetadataManager =
TableDedupMetadataManagerFactory.create(
- instanceDedupConfig, tableConfig, schema, tableDataManager)) {
+ instanceDedupConfig, tableConfig, schema, tableDataManager, null)) {
assertEquals(tableDedupMetadataManager.getContext().isPreloadEnabled(),
expected);
}
}
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
index 34d49ec1f2..a3ca692e0a 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentDedupTest.java
@@ -119,7 +119,8 @@ public class MutableSegmentDedupTest implements
PinotBuffersAfterMethodCheckRule
.build();
TableDataManager tableDataManager = mock(TableDataManager.class);
when(tableDataManager.getTableDataDir()).thenReturn(TEMP_DIR);
- return TableDedupMetadataManagerFactory.create(new PinotConfiguration(),
tableConfig, schema, tableDataManager);
+ return TableDedupMetadataManagerFactory.create(new PinotConfiguration(),
tableConfig, schema, tableDataManager,
+ null);
}
public List<Map<String, String>> loadJsonFile(String filePath)
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
index 378afbb6cd..1588de8f91 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertComparisonColTest.java
@@ -87,7 +87,8 @@ public class MutableSegmentImplUpsertComparisonColTest
implements PinotBuffersAf
_recordTransformer =
CompositeTransformer.getDefaultTransformer(_tableConfig, _schema);
File jsonFile = new File(dataResourceUrl.getFile());
TableUpsertMetadataManager tableUpsertMetadataManager =
- TableUpsertMetadataManagerFactory.create(new PinotConfiguration(),
_tableConfig, _schema, _tableDataManager);
+ TableUpsertMetadataManagerFactory.create(new PinotConfiguration(),
_tableConfig, _schema, _tableDataManager,
+ null);
_partitionUpsertMetadataManager =
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(_schema, true,
"secondsSinceEpoch",
_partitionUpsertMetadataManager, null);
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
index a2ac9d6c09..b72ef4679b 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/indexsegment/mutable/MutableSegmentImplUpsertTest.java
@@ -82,7 +82,7 @@ public class MutableSegmentImplUpsertTest {
File jsonFile = new File(dataResourceUrl.getFile());
TableUpsertMetadataManager tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(new PinotConfiguration(),
tableConfig, schema,
- mock(TableDataManager.class));
+ mock(TableDataManager.class), null);
_partitionUpsertMetadataManager =
tableUpsertMetadataManager.getOrCreatePartitionManager(0);
_mutableSegmentImpl =
MutableSegmentImplTestUtils.createMutableSegmentImpl(schema, true,
TIME_COLUMN, _partitionUpsertMetadataManager,
diff --git
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
index eb7c77ade9..4dfb248502 100644
---
a/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
+++
b/pinot-segment-local/src/test/java/org/apache/pinot/segment/local/upsert/TableUpsertMetadataManagerFactoryTest.java
@@ -65,11 +65,11 @@ public class TableUpsertMetadataManagerFactoryTest {
when(tableDataManager.getTableDataDir()).thenReturn(new
File(RAW_TABLE_NAME));
TableUpsertMetadataManager tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(new PinotConfiguration(),
createTableConfig(upsertConfig), SCHEMA,
- tableDataManager);
+ tableDataManager, null);
assertNotNull(tableUpsertMetadataManager);
assertTrue(tableUpsertMetadataManager instanceof
ConcurrentMapTableUpsertMetadataManager);
- assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(
- 0) instanceof ConcurrentMapPartitionUpsertMetadataManager);
+ assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0)
+ instanceof ConcurrentMapPartitionUpsertMetadataManager);
}
@Test
@@ -82,11 +82,11 @@ public class TableUpsertMetadataManagerFactoryTest {
when(tableDataManager.getTableDataDir()).thenReturn(new
File(RAW_TABLE_NAME));
TableUpsertMetadataManager tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(new PinotConfiguration(),
createTableConfig(upsertConfig), SCHEMA,
- tableDataManager);
+ tableDataManager, null);
assertNotNull(tableUpsertMetadataManager);
assertTrue(tableUpsertMetadataManager instanceof
ConcurrentMapTableUpsertMetadataManager);
- assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(
- 0) instanceof
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes);
+ assertTrue(tableUpsertMetadataManager.getOrCreatePartitionManager(0)
+ instanceof
ConcurrentMapPartitionUpsertMetadataManagerForConsistentDeletes);
}
@SuppressWarnings("deprecation")
@@ -139,7 +139,7 @@ public class TableUpsertMetadataManagerFactoryTest {
TableDataManager tableDataManager, boolean expected)
throws IOException {
try (TableUpsertMetadataManager tableUpsertMetadataManager =
TableUpsertMetadataManagerFactory.create(
- instanceUpsertConfig, tableConfig, SCHEMA, tableDataManager)) {
+ instanceUpsertConfig, tableConfig, SCHEMA, tableDataManager, null)) {
assertEquals(tableUpsertMetadataManager.getContext().isPreloadEnabled(),
expected);
}
}
diff --git
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
index a40414d25c..76d4a3a39b 100644
---
a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
+++
b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java
@@ -644,38 +644,17 @@ public abstract class BaseServerStarter implements
ServiceStartable {
ServerSegmentCompletionProtocolHandler.init(
_serverConf.subset(SegmentCompletionProtocol.PREFIX_OF_CONFIG_OF_SEGMENT_UPLOADER));
- int maxPreprocessConcurrency = Integer.parseInt(
-
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
- Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM));
- int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
-
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
-
Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
- // Relax throttling until the server is ready to serve queries
- SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler =
- new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency,
maxPreprocessConcurrencyBeforeServingQueries,
- false);
- int maxStarTreePreprocessConcurrency = Integer.parseInt(
-
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
- Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM));
- int maxStarTreePreprocessConcurrencyBeforeServingQueries =
Integer.parseInt(
-
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
-
Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
- // Relax throttling until the server is ready to serve queries
- SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler =
- new
SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency,
- maxStarTreePreprocessConcurrencyBeforeServingQueries, false);
- int maxDownloadConcurrency = Integer.parseInt(
-
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
- Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM));
- int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt(
-
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
-
Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES));
- // Relax throttling until the server is ready to serve queries
- SegmentDownloadThrottler segmentDownloadThrottler =
- new SegmentDownloadThrottler(maxDownloadConcurrency,
maxDownloadConcurrencyBeforeServingQueries, false);
- _segmentOperationsThrottler =
- new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler,
segmentStarTreePreprocessThrottler,
- segmentDownloadThrottler);
+ if (_segmentOperationsThrottler == null) {
+ // Only create segment operation throttlers if null
+ SegmentAllIndexPreprocessThrottler segmentAllIndexPreprocessThrottler =
+ createSegmentAllIndexPreprocessThrottler();
+ SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler =
+ createSegmentStarTreePreprocessThrottler();
+ SegmentDownloadThrottler segmentDownloadThrottler =
createSegmentDownloadThrottler();
+ _segmentOperationsThrottler =
+ new SegmentOperationsThrottler(segmentAllIndexPreprocessThrottler,
segmentStarTreePreprocessThrottler,
+ segmentDownloadThrottler);
+ }
SendStatsPredicate sendStatsPredicate =
SendStatsPredicate.create(_serverConf, _helixManager);
ServerConf serverConf = new ServerConf(_serverConf);
@@ -829,6 +808,42 @@ public abstract class BaseServerStarter implements
ServiceStartable {
}
}
+ protected SegmentAllIndexPreprocessThrottler
createSegmentAllIndexPreprocessThrottler() {
+ int maxPreprocessConcurrency = Integer.parseInt(
+
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM,
+ Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM));
+ int maxPreprocessConcurrencyBeforeServingQueries = Integer.parseInt(
+
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
+
Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
+ // Relax throttling until the server is ready to serve queries
+ return new SegmentAllIndexPreprocessThrottler(maxPreprocessConcurrency,
+ maxPreprocessConcurrencyBeforeServingQueries, false);
+ }
+
+ protected SegmentStarTreePreprocessThrottler
createSegmentStarTreePreprocessThrottler() {
+ int maxStarTreePreprocessConcurrency = Integer.parseInt(
+
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM,
+ Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM));
+ int maxStarTreePreprocessConcurrencyBeforeServingQueries =
Integer.parseInt(
+
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES,
+
Helix.DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES));
+ // Relax throttling until the server is ready to serve queries
+ return new
SegmentStarTreePreprocessThrottler(maxStarTreePreprocessConcurrency,
+ maxStarTreePreprocessConcurrencyBeforeServingQueries, false);
+ }
+
+ protected SegmentDownloadThrottler createSegmentDownloadThrottler() {
+ int maxDownloadConcurrency = Integer.parseInt(
+
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM,
+ Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM));
+ int maxDownloadConcurrencyBeforeServingQueries = Integer.parseInt(
+
_serverConf.getProperty(Helix.CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES,
+
Helix.DEFAULT_MAX_SEGMENT_DOWNLOAD_PARALLELISM_BEFORE_SERVING_QUERIES));
+ // Relax throttling until the server is ready to serve queries
+ return new SegmentDownloadThrottler(maxDownloadConcurrency,
maxDownloadConcurrencyBeforeServingQueries,
+ false);
+ }
+
/**
* Can be overridden to perform operations before server starts serving
queries.
*/
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 86b8200dfc..137eaa9149 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -268,6 +268,7 @@ public class CommonConstants {
// Setting the before serving queries to Integer.MAX_VALUE to effectively
disable throttling by default
public static final String
DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
String.valueOf(Integer.MAX_VALUE);
+
// Preprocess throttle config specifically for StarTree index rebuild
public static final String
CONFIG_OF_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM =
"pinot.server.max.segment.startree.preprocess.parallelism";
@@ -278,6 +279,8 @@ public class CommonConstants {
// Setting the before serving queries to Integer.MAX_VALUE to effectively
disable throttling by default
public static final String
DEFAULT_MAX_SEGMENT_STARTREE_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES =
String.valueOf(Integer.MAX_VALUE);
+
+ // Download throttle config
public static final String CONFIG_OF_MAX_SEGMENT_DOWNLOAD_PARALLELISM =
"pinot.server.max.segment.download.parallelism";
// Setting to Integer.MAX_VALUE to effectively disable throttling by
default
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]