Jackie-Jiang commented on code in PR #14943:
URL: https://github.com/apache/pinot/pull/14943#discussion_r1938058304
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java:
##########
@@ -18,208 +18,40 @@
*/
package org.apache.pinot.segment.local.utils;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Used to throttle the total concurrent index rebuilds that can happen on a
given Pinot server.
+ * Contains all the segment preprocess throttlers used to control the total
index rebuilds that can happen on a given
+ * Pinot server. For now this class supports index rebuild throttling at the
following levels:
+ * - All index throttling
+ * - StarTree index throttling
* Code paths that do no need to rebuild the index or which don't happen on
the server need not utilize this throttler.
*/
-public class SegmentPreprocessThrottler implements
PinotClusterConfigChangeListener {
+public class SegmentPreprocessThrottler {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
- /**
- * _maxPreprocessConcurrency and
_maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively
disable
- * throttling, this can be set to a very high value
- */
- private int _maxPreprocessConcurrency;
- private int _maxPreprocessConcurrencyBeforeServingQueries;
- private boolean _isServingQueries;
- private final AdjustableSemaphore _semaphore;
-
- /**
- * @param maxPreprocessConcurrency configured preprocessing concurrency
- * @param maxPreprocessConcurrencyBeforeServingQueries configured
preprocessing concurrency before serving queries
- * @param isServingQueries whether the server is ready to serve queries or
not
- */
- public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int
maxPreprocessConcurrencyBeforeServingQueries,
- boolean isServingQueries) {
- LOGGER.info("Initializing SegmentPreprocessThrottler,
maxPreprocessConcurrency: {}, "
- + "maxPreprocessConcurrencyBeforeServingQueries: {},
isServingQueries: {}",
- maxPreprocessConcurrency,
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries);
- Preconditions.checkArgument(maxPreprocessConcurrency > 0,
- "Max preprocess parallelism must be > 0, but found to be: " +
maxPreprocessConcurrency);
- Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries >
0,
- "Max preprocess parallelism before serving queries must be > 0, but
found to be: "
- + maxPreprocessConcurrencyBeforeServingQueries);
-
- _maxPreprocessConcurrency = maxPreprocessConcurrency;
- _maxPreprocessConcurrencyBeforeServingQueries =
maxPreprocessConcurrencyBeforeServingQueries;
- _isServingQueries = isServingQueries;
-
- // maxConcurrentPreprocessesBeforeServingQueries is only used prior to
serving queries and once the server is
- // ready to serve queries this is not used again. This too is configurable
via ZK CLUSTER config updates while the
- // server is starting up.
- int preprocessConcurrency = _maxPreprocessConcurrency;
- if (!isServingQueries) {
- preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries;
- LOGGER.info("Serving queries is disabled, setting preprocess concurrency
to: {}", preprocessConcurrency);
- }
- _semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
- LOGGER.info("Created semaphore with total permits: {}, available permits:
{}", totalPermits(),
- availablePermits());
- }
-
- public synchronized void startServingQueries() {
- LOGGER.info("Serving queries is to be enabled, reset throttling threshold
for segment preprocess concurrency, "
- + "total permits: {}, available permits: {}", totalPermits(),
availablePermits());
- _isServingQueries = true;
- _semaphore.setPermits(_maxPreprocessConcurrency);
- LOGGER.info("Reset throttling completed, new concurrency: {}, total
permits: {}, available permits: {}",
- _maxPreprocessConcurrency, totalPermits(), availablePermits());
- }
-
- @Override
- public synchronized void onChange(Set<String> changedConfigs, Map<String,
String> clusterConfigs) {
- if (CollectionUtils.isEmpty(changedConfigs)) {
- LOGGER.info("Skip updating SegmentPreprocessThrottler configs with
unchanged clusterConfigs");
- return;
- }
-
- LOGGER.info("Updating SegmentPreprocessThrottler configs with latest
clusterConfigs");
- handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
- handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs,
clusterConfigs);
- LOGGER.info("Updated SegmentPreprocessThrottler configs with latest
clusterConfigs");
- }
-
- private void handleMaxPreprocessConcurrencyChange(Set<String>
changedConfigs, Map<String, String> clusterConfigs) {
- if
(!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM))
{
- LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was
not updated, skipping updates");
- return;
- }
-
- String configName =
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
- String defaultConfigValue =
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
- String maxParallelSegmentPreprocessesStr =
- clusterConfigs == null ? defaultConfigValue :
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
- int maxPreprocessConcurrency;
- try {
- maxPreprocessConcurrency =
Integer.parseInt(maxParallelSegmentPreprocessesStr);
- } catch (Exception e) {
- LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making
change, fix config and try again",
- maxParallelSegmentPreprocessesStr);
- return;
- }
-
- if (maxPreprocessConcurrency <= 0) {
- LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making
change, fix config and try again",
- maxPreprocessConcurrency);
- return;
- }
-
- if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
- LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total
permits: {}", _maxPreprocessConcurrency,
- totalPermits());
- return;
- }
-
- LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}",
_maxPreprocessConcurrency,
- maxPreprocessConcurrency);
- _maxPreprocessConcurrency = maxPreprocessConcurrency;
-
- if (!_isServingQueries) {
- LOGGER.info("Serving queries hasn't been enabled yet, not updating the
permits with maxPreprocessConcurrency");
- return;
- }
- _semaphore.setPermits(_maxPreprocessConcurrency);
- LOGGER.info("Updated total permits: {}", totalPermits());
- }
-
- private void
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String>
changedConfigs,
- Map<String, String> clusterConfigs) {
- if (!changedConfigs.contains(
-
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES))
{
- LOGGER.info("changedConfigs list indicates
maxPreprocessConcurrencyBeforeServingQueries was not updated, "
- + "skipping updates");
- return;
- }
-
- String configName =
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
- String defaultConfigValue =
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
- String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
- clusterConfigs == null ? defaultConfigValue :
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
- int maxPreprocessConcurrencyBeforeServingQueries;
- try {
- maxPreprocessConcurrencyBeforeServingQueries =
-
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
- } catch (Exception e) {
- LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set:
{}, not making change, fix config and "
- + "try again",
maxParallelSegmentPreprocessesBeforeServingQueriesStr);
- return;
- }
-
- if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
- LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be >
0, not making change, fix config "
- + "and try again", maxPreprocessConcurrencyBeforeServingQueries);
- return;
- }
-
- if (maxPreprocessConcurrencyBeforeServingQueries ==
_maxPreprocessConcurrencyBeforeServingQueries) {
- LOGGER.info("No ZK update for
maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
- _maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
- return;
- }
-
- LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {}
to: {}",
- _maxPreprocessConcurrencyBeforeServingQueries,
maxPreprocessConcurrencyBeforeServingQueries);
- _maxPreprocessConcurrencyBeforeServingQueries =
maxPreprocessConcurrencyBeforeServingQueries;
- if (!_isServingQueries) {
- LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated
before serving queries was enabled, "
- + "updating the permits");
- _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
- LOGGER.info("Updated total permits: {}", totalPermits());
- }
- }
-
- /**
- * Block trying to acquire the semaphore to perform the segment index
rebuild steps unless interrupted.
- * <p>
- * {@link #release()} should be called after the segment preprocess
completes. It is the responsibility of the caller
- * to ensure that {@link #release()} is called exactly once for each call to
this method.
- *
- * @throws InterruptedException if the current thread is interrupted
- */
- public void acquire()
- throws InterruptedException {
- _semaphore.acquire();
- }
+ SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler;
+ SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler;
/**
- * Should be called after the segment index build completes. It is the
responsibility of the caller to ensure that
- * this method is called exactly once for each call to {@link #acquire()}.
+ * Constructor for SegmentPreprocessThrottler
+ * @param segmentAllIndexPreprocessThrottler segment preprocess throttler to
use for all indexes
+ * @param segmentStarTreePreprocessThrottler segment preprocess throttler to
use for StarTree index
*/
- public void release() {
- _semaphore.release();
+ public SegmentPreprocessThrottler(SegmentAllIndexPreprocessThrottler
segmentAllIndexPreprocessThrottler,
+ SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler) {
Review Comment:
Are they nullable? If so, we also need to do null check on the caller side
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java:
##########
@@ -18,208 +18,40 @@
*/
package org.apache.pinot.segment.local.utils;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Used to throttle the total concurrent index rebuilds that can happen on a
given Pinot server.
+ * Contains all the segment preprocess throttlers used to control the total
index rebuilds that can happen on a given
+ * Pinot server. For now this class supports index rebuild throttling at the
following levels:
+ * - All index throttling
+ * - StarTree index throttling
* Code paths that do no need to rebuild the index or which don't happen on
the server need not utilize this throttler.
*/
-public class SegmentPreprocessThrottler implements
PinotClusterConfigChangeListener {
+public class SegmentPreprocessThrottler {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
- /**
- * _maxPreprocessConcurrency and
_maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively
disable
- * throttling, this can be set to a very high value
- */
- private int _maxPreprocessConcurrency;
- private int _maxPreprocessConcurrencyBeforeServingQueries;
- private boolean _isServingQueries;
- private final AdjustableSemaphore _semaphore;
-
- /**
- * @param maxPreprocessConcurrency configured preprocessing concurrency
- * @param maxPreprocessConcurrencyBeforeServingQueries configured
preprocessing concurrency before serving queries
- * @param isServingQueries whether the server is ready to serve queries or
not
- */
- public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int
maxPreprocessConcurrencyBeforeServingQueries,
- boolean isServingQueries) {
- LOGGER.info("Initializing SegmentPreprocessThrottler,
maxPreprocessConcurrency: {}, "
- + "maxPreprocessConcurrencyBeforeServingQueries: {},
isServingQueries: {}",
- maxPreprocessConcurrency,
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries);
- Preconditions.checkArgument(maxPreprocessConcurrency > 0,
- "Max preprocess parallelism must be > 0, but found to be: " +
maxPreprocessConcurrency);
- Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries >
0,
- "Max preprocess parallelism before serving queries must be > 0, but
found to be: "
- + maxPreprocessConcurrencyBeforeServingQueries);
-
- _maxPreprocessConcurrency = maxPreprocessConcurrency;
- _maxPreprocessConcurrencyBeforeServingQueries =
maxPreprocessConcurrencyBeforeServingQueries;
- _isServingQueries = isServingQueries;
-
- // maxConcurrentPreprocessesBeforeServingQueries is only used prior to
serving queries and once the server is
- // ready to serve queries this is not used again. This too is configurable
via ZK CLUSTER config updates while the
- // server is starting up.
- int preprocessConcurrency = _maxPreprocessConcurrency;
- if (!isServingQueries) {
- preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries;
- LOGGER.info("Serving queries is disabled, setting preprocess concurrency
to: {}", preprocessConcurrency);
- }
- _semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
- LOGGER.info("Created semaphore with total permits: {}, available permits:
{}", totalPermits(),
- availablePermits());
- }
-
- public synchronized void startServingQueries() {
- LOGGER.info("Serving queries is to be enabled, reset throttling threshold
for segment preprocess concurrency, "
- + "total permits: {}, available permits: {}", totalPermits(),
availablePermits());
- _isServingQueries = true;
- _semaphore.setPermits(_maxPreprocessConcurrency);
- LOGGER.info("Reset throttling completed, new concurrency: {}, total
permits: {}, available permits: {}",
- _maxPreprocessConcurrency, totalPermits(), availablePermits());
- }
-
- @Override
- public synchronized void onChange(Set<String> changedConfigs, Map<String,
String> clusterConfigs) {
- if (CollectionUtils.isEmpty(changedConfigs)) {
- LOGGER.info("Skip updating SegmentPreprocessThrottler configs with
unchanged clusterConfigs");
- return;
- }
-
- LOGGER.info("Updating SegmentPreprocessThrottler configs with latest
clusterConfigs");
- handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
- handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs,
clusterConfigs);
- LOGGER.info("Updated SegmentPreprocessThrottler configs with latest
clusterConfigs");
- }
-
- private void handleMaxPreprocessConcurrencyChange(Set<String>
changedConfigs, Map<String, String> clusterConfigs) {
- if
(!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM))
{
- LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was
not updated, skipping updates");
- return;
- }
-
- String configName =
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
- String defaultConfigValue =
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
- String maxParallelSegmentPreprocessesStr =
- clusterConfigs == null ? defaultConfigValue :
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
- int maxPreprocessConcurrency;
- try {
- maxPreprocessConcurrency =
Integer.parseInt(maxParallelSegmentPreprocessesStr);
- } catch (Exception e) {
- LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making
change, fix config and try again",
- maxParallelSegmentPreprocessesStr);
- return;
- }
-
- if (maxPreprocessConcurrency <= 0) {
- LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making
change, fix config and try again",
- maxPreprocessConcurrency);
- return;
- }
-
- if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
- LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total
permits: {}", _maxPreprocessConcurrency,
- totalPermits());
- return;
- }
-
- LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}",
_maxPreprocessConcurrency,
- maxPreprocessConcurrency);
- _maxPreprocessConcurrency = maxPreprocessConcurrency;
-
- if (!_isServingQueries) {
- LOGGER.info("Serving queries hasn't been enabled yet, not updating the
permits with maxPreprocessConcurrency");
- return;
- }
- _semaphore.setPermits(_maxPreprocessConcurrency);
- LOGGER.info("Updated total permits: {}", totalPermits());
- }
-
- private void
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String>
changedConfigs,
- Map<String, String> clusterConfigs) {
- if (!changedConfigs.contains(
-
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES))
{
- LOGGER.info("changedConfigs list indicates
maxPreprocessConcurrencyBeforeServingQueries was not updated, "
- + "skipping updates");
- return;
- }
-
- String configName =
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
- String defaultConfigValue =
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
- String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
- clusterConfigs == null ? defaultConfigValue :
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
- int maxPreprocessConcurrencyBeforeServingQueries;
- try {
- maxPreprocessConcurrencyBeforeServingQueries =
-
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
- } catch (Exception e) {
- LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set:
{}, not making change, fix config and "
- + "try again",
maxParallelSegmentPreprocessesBeforeServingQueriesStr);
- return;
- }
-
- if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
- LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be >
0, not making change, fix config "
- + "and try again", maxPreprocessConcurrencyBeforeServingQueries);
- return;
- }
-
- if (maxPreprocessConcurrencyBeforeServingQueries ==
_maxPreprocessConcurrencyBeforeServingQueries) {
- LOGGER.info("No ZK update for
maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
- _maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
- return;
- }
-
- LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {}
to: {}",
- _maxPreprocessConcurrencyBeforeServingQueries,
maxPreprocessConcurrencyBeforeServingQueries);
- _maxPreprocessConcurrencyBeforeServingQueries =
maxPreprocessConcurrencyBeforeServingQueries;
- if (!_isServingQueries) {
- LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated
before serving queries was enabled, "
- + "updating the permits");
- _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
- LOGGER.info("Updated total permits: {}", totalPermits());
- }
- }
-
- /**
- * Block trying to acquire the semaphore to perform the segment index
rebuild steps unless interrupted.
- * <p>
- * {@link #release()} should be called after the segment preprocess
completes. It is the responsibility of the caller
- * to ensure that {@link #release()} is called exactly once for each call to
this method.
- *
- * @throws InterruptedException if the current thread is interrupted
- */
- public void acquire()
- throws InterruptedException {
- _semaphore.acquire();
- }
+ SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler;
+ SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler;
Review Comment:
```suggestion
private final SegmentAllIndexPreprocessThrottler
_segmentAllIndexPreprocessThrottler;
private final SegmentStarTreePreprocessThrottler
_segmentStarTreePreprocessThrottler;
```
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPreprocessThrottler.java:
##########
@@ -18,208 +18,40 @@
*/
package org.apache.pinot.segment.local.utils;
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
-import java.util.Map;
-import java.util.Set;
-import org.apache.commons.collections4.CollectionUtils;
-import org.apache.pinot.common.concurrency.AdjustableSemaphore;
-import org.apache.pinot.spi.config.provider.PinotClusterConfigChangeListener;
-import org.apache.pinot.spi.utils.CommonConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Used to throttle the total concurrent index rebuilds that can happen on a
given Pinot server.
+ * Contains all the segment preprocess throttlers used to control the total
index rebuilds that can happen on a given
+ * Pinot server. For now this class supports index rebuild throttling at the
following levels:
+ * - All index throttling
+ * - StarTree index throttling
* Code paths that do no need to rebuild the index or which don't happen on
the server need not utilize this throttler.
*/
-public class SegmentPreprocessThrottler implements
PinotClusterConfigChangeListener {
+public class SegmentPreprocessThrottler {
private static final Logger LOGGER =
LoggerFactory.getLogger(SegmentPreprocessThrottler.class);
- /**
- * _maxPreprocessConcurrency and
_maxConcurrentPreprocessesBeforeServingQueries must be > 0. To effectively
disable
- * throttling, this can be set to a very high value
- */
- private int _maxPreprocessConcurrency;
- private int _maxPreprocessConcurrencyBeforeServingQueries;
- private boolean _isServingQueries;
- private final AdjustableSemaphore _semaphore;
-
- /**
- * @param maxPreprocessConcurrency configured preprocessing concurrency
- * @param maxPreprocessConcurrencyBeforeServingQueries configured
preprocessing concurrency before serving queries
- * @param isServingQueries whether the server is ready to serve queries or
not
- */
- public SegmentPreprocessThrottler(int maxPreprocessConcurrency, int
maxPreprocessConcurrencyBeforeServingQueries,
- boolean isServingQueries) {
- LOGGER.info("Initializing SegmentPreprocessThrottler,
maxPreprocessConcurrency: {}, "
- + "maxPreprocessConcurrencyBeforeServingQueries: {},
isServingQueries: {}",
- maxPreprocessConcurrency,
maxPreprocessConcurrencyBeforeServingQueries, isServingQueries);
- Preconditions.checkArgument(maxPreprocessConcurrency > 0,
- "Max preprocess parallelism must be > 0, but found to be: " +
maxPreprocessConcurrency);
- Preconditions.checkArgument(maxPreprocessConcurrencyBeforeServingQueries >
0,
- "Max preprocess parallelism before serving queries must be > 0, but
found to be: "
- + maxPreprocessConcurrencyBeforeServingQueries);
-
- _maxPreprocessConcurrency = maxPreprocessConcurrency;
- _maxPreprocessConcurrencyBeforeServingQueries =
maxPreprocessConcurrencyBeforeServingQueries;
- _isServingQueries = isServingQueries;
-
- // maxConcurrentPreprocessesBeforeServingQueries is only used prior to
serving queries and once the server is
- // ready to serve queries this is not used again. This too is configurable
via ZK CLUSTER config updates while the
- // server is starting up.
- int preprocessConcurrency = _maxPreprocessConcurrency;
- if (!isServingQueries) {
- preprocessConcurrency = _maxPreprocessConcurrencyBeforeServingQueries;
- LOGGER.info("Serving queries is disabled, setting preprocess concurrency
to: {}", preprocessConcurrency);
- }
- _semaphore = new AdjustableSemaphore(preprocessConcurrency, true);
- LOGGER.info("Created semaphore with total permits: {}, available permits:
{}", totalPermits(),
- availablePermits());
- }
-
- public synchronized void startServingQueries() {
- LOGGER.info("Serving queries is to be enabled, reset throttling threshold
for segment preprocess concurrency, "
- + "total permits: {}, available permits: {}", totalPermits(),
availablePermits());
- _isServingQueries = true;
- _semaphore.setPermits(_maxPreprocessConcurrency);
- LOGGER.info("Reset throttling completed, new concurrency: {}, total
permits: {}, available permits: {}",
- _maxPreprocessConcurrency, totalPermits(), availablePermits());
- }
-
- @Override
- public synchronized void onChange(Set<String> changedConfigs, Map<String,
String> clusterConfigs) {
- if (CollectionUtils.isEmpty(changedConfigs)) {
- LOGGER.info("Skip updating SegmentPreprocessThrottler configs with
unchanged clusterConfigs");
- return;
- }
-
- LOGGER.info("Updating SegmentPreprocessThrottler configs with latest
clusterConfigs");
- handleMaxPreprocessConcurrencyChange(changedConfigs, clusterConfigs);
- handleMaxPreprocessConcurrencyBeforeServingQueriesChange(changedConfigs,
clusterConfigs);
- LOGGER.info("Updated SegmentPreprocessThrottler configs with latest
clusterConfigs");
- }
-
- private void handleMaxPreprocessConcurrencyChange(Set<String>
changedConfigs, Map<String, String> clusterConfigs) {
- if
(!changedConfigs.contains(CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM))
{
- LOGGER.info("changedConfigs list indicates maxPreprocessConcurrency was
not updated, skipping updates");
- return;
- }
-
- String configName =
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM;
- String defaultConfigValue =
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM;
- String maxParallelSegmentPreprocessesStr =
- clusterConfigs == null ? defaultConfigValue :
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
- int maxPreprocessConcurrency;
- try {
- maxPreprocessConcurrency =
Integer.parseInt(maxParallelSegmentPreprocessesStr);
- } catch (Exception e) {
- LOGGER.warn("Invalid maxPreprocessConcurrency set: {}, not making
change, fix config and try again",
- maxParallelSegmentPreprocessesStr);
- return;
- }
-
- if (maxPreprocessConcurrency <= 0) {
- LOGGER.warn("maxPreprocessConcurrency: {} must be > 0, not making
change, fix config and try again",
- maxPreprocessConcurrency);
- return;
- }
-
- if (maxPreprocessConcurrency == _maxPreprocessConcurrency) {
- LOGGER.info("No ZK update for maxPreprocessConcurrency {}, total
permits: {}", _maxPreprocessConcurrency,
- totalPermits());
- return;
- }
-
- LOGGER.info("Updated maxPreprocessConcurrency from: {} to: {}",
_maxPreprocessConcurrency,
- maxPreprocessConcurrency);
- _maxPreprocessConcurrency = maxPreprocessConcurrency;
-
- if (!_isServingQueries) {
- LOGGER.info("Serving queries hasn't been enabled yet, not updating the
permits with maxPreprocessConcurrency");
- return;
- }
- _semaphore.setPermits(_maxPreprocessConcurrency);
- LOGGER.info("Updated total permits: {}", totalPermits());
- }
-
- private void
handleMaxPreprocessConcurrencyBeforeServingQueriesChange(Set<String>
changedConfigs,
- Map<String, String> clusterConfigs) {
- if (!changedConfigs.contains(
-
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES))
{
- LOGGER.info("changedConfigs list indicates
maxPreprocessConcurrencyBeforeServingQueries was not updated, "
- + "skipping updates");
- return;
- }
-
- String configName =
CommonConstants.Helix.CONFIG_OF_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
- String defaultConfigValue =
CommonConstants.Helix.DEFAULT_MAX_SEGMENT_PREPROCESS_PARALLELISM_BEFORE_SERVING_QUERIES;
- String maxParallelSegmentPreprocessesBeforeServingQueriesStr =
- clusterConfigs == null ? defaultConfigValue :
clusterConfigs.getOrDefault(configName, defaultConfigValue);
-
- int maxPreprocessConcurrencyBeforeServingQueries;
- try {
- maxPreprocessConcurrencyBeforeServingQueries =
-
Integer.parseInt(maxParallelSegmentPreprocessesBeforeServingQueriesStr);
- } catch (Exception e) {
- LOGGER.warn("Invalid maxPreprocessConcurrencyBeforeServingQueries set:
{}, not making change, fix config and "
- + "try again",
maxParallelSegmentPreprocessesBeforeServingQueriesStr);
- return;
- }
-
- if (maxPreprocessConcurrencyBeforeServingQueries <= 0) {
- LOGGER.warn("maxPreprocessConcurrencyBeforeServingQueries: {} must be >
0, not making change, fix config "
- + "and try again", maxPreprocessConcurrencyBeforeServingQueries);
- return;
- }
-
- if (maxPreprocessConcurrencyBeforeServingQueries ==
_maxPreprocessConcurrencyBeforeServingQueries) {
- LOGGER.info("No ZK update for
maxPreprocessConcurrencyBeforeServingQueries {}, total permits: {}",
- _maxPreprocessConcurrencyBeforeServingQueries, totalPermits());
- return;
- }
-
- LOGGER.info("Updated maxPreprocessConcurrencyBeforeServingQueries from: {}
to: {}",
- _maxPreprocessConcurrencyBeforeServingQueries,
maxPreprocessConcurrencyBeforeServingQueries);
- _maxPreprocessConcurrencyBeforeServingQueries =
maxPreprocessConcurrencyBeforeServingQueries;
- if (!_isServingQueries) {
- LOGGER.info("maxPreprocessConcurrencyBeforeServingQueries was updated
before serving queries was enabled, "
- + "updating the permits");
- _semaphore.setPermits(_maxPreprocessConcurrencyBeforeServingQueries);
- LOGGER.info("Updated total permits: {}", totalPermits());
- }
- }
-
- /**
- * Block trying to acquire the semaphore to perform the segment index
rebuild steps unless interrupted.
- * <p>
- * {@link #release()} should be called after the segment preprocess
completes. It is the responsibility of the caller
- * to ensure that {@link #release()} is called exactly once for each call to
this method.
- *
- * @throws InterruptedException if the current thread is interrupted
- */
- public void acquire()
- throws InterruptedException {
- _semaphore.acquire();
- }
+ SegmentAllIndexPreprocessThrottler _segmentAllIndexPreprocessThrottler;
+ SegmentStarTreePreprocessThrottler _segmentStarTreePreprocessThrottler;
/**
- * Should be called after the segment index build completes. It is the
responsibility of the caller to ensure that
- * this method is called exactly once for each call to {@link #acquire()}.
+ * Constructor for SegmentPreprocessThrottler
+ * @param segmentAllIndexPreprocessThrottler segment preprocess throttler to
use for all indexes
+ * @param segmentStarTreePreprocessThrottler segment preprocess throttler to
use for StarTree index
*/
- public void release() {
- _semaphore.release();
+ public SegmentPreprocessThrottler(SegmentAllIndexPreprocessThrottler
segmentAllIndexPreprocessThrottler,
+ SegmentStarTreePreprocessThrottler segmentStarTreePreprocessThrottler) {
+ LOGGER.info("Initializing SegmentPreprocessThrottler");
Review Comment:
This info is not very useful. I assume we are logging details within the
individual throttlers?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]