This is an automated email from the ASF dual-hosted git repository. kfaraz pushed a commit to branch 30.0.0 in repository https://gitbox.apache.org/repos/asf/druid.git
The following commit(s) were added to refs/heads/30.0.0 by this push: new d6858771d1a Add config lagAggregate to LagBasedAutoScalerConfig (#16334) (#16377) d6858771d1a is described below commit d6858771d1acb63c5fb39a3a6bcd62b03461dc15 Author: Adithya Chakilam <35785271+adithyachaki...@users.noreply.github.com> AuthorDate: Fri May 3 10:58:47 2024 -0500 Add config lagAggregate to LagBasedAutoScalerConfig (#16334) (#16377) Changes: - Add new config `lagAggregate` to `LagBasedAutoScalerConfig` - Add field `aggregateForScaling` to `LagStats` - Use the new field/config to determine which aggregate to use to compute lag - Remove method `Supervisor.computeLagForAutoScaler()` --- docs/ingestion/supervisor.md | 1 + .../kinesis/supervisor/KinesisSupervisor.java | 7 ----- .../supervisor/autoscaler/LagBasedAutoScaler.java | 15 +++++++++-- .../autoscaler/LagBasedAutoScalerConfig.java | 14 +++++++++- .../indexing/overlord/supervisor/Supervisor.java | 9 ------- .../{LagStats.java => AggregateFunction.java} | 30 +++------------------- .../overlord/supervisor/autoscaler/LagStats.java | 29 +++++++++++++++++++++ .../{SupervisorTest.java => LagStatsTest.java} | 15 +++++------ 8 files changed, 67 insertions(+), 53 deletions(-) diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md index 76dd1cc4a7c..9320c39a02a 100644 --- a/docs/ingestion/supervisor.md +++ b/docs/ingestion/supervisor.md @@ -96,6 +96,7 @@ The following table outlines the configuration properties related to the `lagBas |`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale action is triggered.|No|60000| |`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1| |`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2| +|`lagAggregate`|The aggregate function used to compute the lag metric for scaling decisions. Possible values are `MAX`, `SUM` and `AVERAGE`. |No|`SUM`| The following example shows a supervisor spec with `lagBased` autoscaler: diff --git a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java index 365a9135e3c..a142f414762 100644 --- a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java +++ b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java @@ -427,13 +427,6 @@ public class KinesisSupervisor extends SeekableStreamSupervisor<String, String, ); } - @Override - public long computeLagForAutoScaler() - { - LagStats lagStats = computeLagStats(); - return lagStats == null ? 0L : lagStats.getMaxLag(); - } - private SeekableStreamDataSourceMetadata<String, String> createDataSourceMetadataWithClosedOrExpiredPartitions( SeekableStreamDataSourceMetadata<String, String> currentMetadata, Set<String> terminatedPartitionIds, diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java index f8618b06f74..ec81c5f9f99 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java @@ -21,6 +21,8 @@ package org.apache.druid.indexing.seekablestream.supervisor.autoscaler; import org.apache.commons.collections4.queue.CircularFifoQueue; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.java.util.common.StringUtils; @@ -154,8 +156,17 @@ public class LagBasedAutoScaler implements SupervisorTaskAutoScaler LOCK.lock(); try { if (!spec.isSuspended()) { - long lag = supervisor.computeLagForAutoScaler(); - lagMetricsQueue.offer(lag > 0 ? lag : 0L); + LagStats lagStats = supervisor.computeLagStats(); + + if (lagStats != null) { + AggregateFunction aggregate = lagBasedAutoScalerConfig.getLagAggregate() == null ? + lagStats.getAggregateForScaling() : + lagBasedAutoScalerConfig.getLagAggregate(); + long lag = lagStats.getMetric(aggregate); + lagMetricsQueue.offer(lag > 0 ? lag : 0L); + } else { + lagMetricsQueue.offer(0L); + } log.debug("Current lags for dataSource[%s] are [%s].", dataSource, lagMetricsQueue); } else { log.warn("[%s] supervisor is suspended, skipping lag collection", dataSource); diff --git a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java index e03242de279..068e7cc4f87 100644 --- a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java +++ b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java @@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonProperty; import org.apache.druid.indexing.overlord.supervisor.Supervisor; import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction; import org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler; import org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor; import org.apache.druid.java.util.emitter.service.ServiceEmitter; @@ -45,6 +46,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig private final int scaleOutStep; private final boolean enableTaskAutoScaler; private final long minTriggerScaleActionFrequencyMillis; + private final AggregateFunction lagAggregate; @JsonCreator public LagBasedAutoScalerConfig( @@ -61,7 +63,8 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig @Nullable @JsonProperty("scaleInStep") Integer scaleInStep, @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep, @Nullable @JsonProperty("enableTaskAutoScaler") Boolean enableTaskAutoScaler, - @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis + @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long minTriggerScaleActionFrequencyMillis, + @Nullable @JsonProperty("lagAggregate") AggregateFunction lagAggregate ) { this.enableTaskAutoScaler = enableTaskAutoScaler != null ? enableTaskAutoScaler : false; @@ -73,6 +76,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 1000000; this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold != null ? triggerScaleOutFractionThreshold : 0.3; this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != null ? triggerScaleInFractionThreshold : 0.9; + this.lagAggregate = lagAggregate; // Only do taskCountMax and taskCountMin check when autoscaler is enabled. So that users left autoConfig empty{} will not throw any exception and autoscaler is disabled. // If autoscaler is disabled, no matter what configs are set, they are not used. @@ -186,6 +190,13 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig return minTriggerScaleActionFrequencyMillis; } + @JsonProperty + @Nullable + public AggregateFunction getLagAggregate() + { + return lagAggregate; + } + @Override public String toString() { @@ -204,6 +215,7 @@ public class LagBasedAutoScalerConfig implements AutoScalerConfig ", scaleActionPeriodMillis=" + scaleActionPeriodMillis + ", scaleInStep=" + scaleInStep + ", scaleOutStep=" + scaleOutStep + + ", lagAggregate=" + lagAggregate + '}'; } } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java index b1fb439184d..bcfc5ebe819 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java @@ -92,14 +92,5 @@ public interface Supervisor */ LagStats computeLagStats(); - /** - * Used by AutoScaler to make scaling decisions. - */ - default long computeLagForAutoScaler() - { - LagStats lagStats = computeLagStats(); - return lagStats == null ? 0L : lagStats.getTotalLag(); - } - int getActiveTaskGroupsCount(); } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java similarity index 67% copy from server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java copy to server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java index 7b6e5fd0bab..247c05180be 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java @@ -19,31 +19,9 @@ package org.apache.druid.indexing.overlord.supervisor.autoscaler; -public class LagStats +public enum AggregateFunction { - private final long maxLag; - private final long totalLag; - private final long avgLag; - - public LagStats(long maxLag, long totalLag, long avgLag) - { - this.maxLag = maxLag; - this.totalLag = totalLag; - this.avgLag = avgLag; - } - - public long getMaxLag() - { - return maxLag; - } - - public long getTotalLag() - { - return totalLag; - } - - public long getAvgLag() - { - return avgLag; - } + MAX, + SUM, + AVERAGE } diff --git a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java index 7b6e5fd0bab..2240585680a 100644 --- a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java +++ b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java @@ -24,12 +24,19 @@ public class LagStats private final long maxLag; private final long totalLag; private final long avgLag; + private final AggregateFunction aggregateForScaling; public LagStats(long maxLag, long totalLag, long avgLag) + { + this(maxLag, totalLag, avgLag, AggregateFunction.SUM); + } + + public LagStats(long maxLag, long totalLag, long avgLag, AggregateFunction aggregateForScaling) { this.maxLag = maxLag; this.totalLag = totalLag; this.avgLag = avgLag; + this.aggregateForScaling = aggregateForScaling == null ? AggregateFunction.SUM : aggregateForScaling; } public long getMaxLag() @@ -46,4 +53,26 @@ public class LagStats { return avgLag; } + + /** + * The preferred scaling metric that supervisor may specify to be used. + * This could be overrided by the autscaler. + */ + public AggregateFunction getAggregateForScaling() + { + return aggregateForScaling; + } + + public long getMetric(AggregateFunction metric) + { + switch (metric) { + case MAX: + return getMaxLag(); + case SUM: + return getTotalLag(); + case AVERAGE: + return getAvgLag(); + } + throw new IllegalStateException("Unknown scale metric"); + } } diff --git a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java similarity index 69% rename from server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java rename to server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java index 79811079d34..5799d8c5e8e 100644 --- a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java +++ b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java @@ -19,22 +19,21 @@ package org.apache.druid.indexing.overlord.supervisor; +import org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction; import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats; import org.junit.Assert; import org.junit.Test; -import org.mockito.Mockito; -public class SupervisorTest +public class LagStatsTest { @Test public void testAutoScalerLagComputation() { - Supervisor supervisor = Mockito.spy(Supervisor.class); + LagStats lagStats = new LagStats(1, 2, 3); - Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 3)); - Assert.assertEquals(2, supervisor.computeLagForAutoScaler()); - - Mockito.when(supervisor.computeLagStats()).thenReturn(null); - Assert.assertEquals(0, supervisor.computeLagForAutoScaler()); + Assert.assertEquals(1, lagStats.getMetric(AggregateFunction.MAX)); + Assert.assertEquals(2, lagStats.getMetric(AggregateFunction.SUM)); + Assert.assertEquals(3, lagStats.getMetric(AggregateFunction.AVERAGE)); + Assert.assertEquals(AggregateFunction.SUM, lagStats.getAggregateForScaling()); } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org For additional commands, e-mail: commits-h...@druid.apache.org