This is an automated email from the ASF dual-hosted git repository.

kfaraz pushed a commit to branch 30.0.0
in repository https://gitbox.apache.org/repos/asf/druid.git


The following commit(s) were added to refs/heads/30.0.0 by this push:
     new d6858771d1a Add config lagAggregate to LagBasedAutoScalerConfig  
(#16334) (#16377)
d6858771d1a is described below

commit d6858771d1acb63c5fb39a3a6bcd62b03461dc15
Author: Adithya Chakilam <35785271+adithyachaki...@users.noreply.github.com>
AuthorDate: Fri May 3 10:58:47 2024 -0500

    Add config lagAggregate to LagBasedAutoScalerConfig  (#16334) (#16377)
    
    Changes:
    - Add new config `lagAggregate` to `LagBasedAutoScalerConfig`
    - Add field `aggregateForScaling` to `LagStats`
    - Use the new field/config to determine which aggregate to use to compute 
lag
    - Remove method `Supervisor.computeLagForAutoScaler()`
---
 docs/ingestion/supervisor.md                       |  1 +
 .../kinesis/supervisor/KinesisSupervisor.java      |  7 -----
 .../supervisor/autoscaler/LagBasedAutoScaler.java  | 15 +++++++++--
 .../autoscaler/LagBasedAutoScalerConfig.java       | 14 +++++++++-
 .../indexing/overlord/supervisor/Supervisor.java   |  9 -------
 .../{LagStats.java => AggregateFunction.java}      | 30 +++-------------------
 .../overlord/supervisor/autoscaler/LagStats.java   | 29 +++++++++++++++++++++
 .../{SupervisorTest.java => LagStatsTest.java}     | 15 +++++------
 8 files changed, 67 insertions(+), 53 deletions(-)

diff --git a/docs/ingestion/supervisor.md b/docs/ingestion/supervisor.md
index 76dd1cc4a7c..9320c39a02a 100644
--- a/docs/ingestion/supervisor.md
+++ b/docs/ingestion/supervisor.md
@@ -96,6 +96,7 @@ The following table outlines the configuration properties 
related to the `lagBas
 |`scaleActionPeriodMillis`|The frequency in milliseconds to check if a scale 
action is triggered.|No|60000|
 |`scaleInStep`|The number of tasks to reduce at once when scaling down.|No|1|
 |`scaleOutStep`|The number of tasks to add at once when scaling out.|No|2|
+|`lagAggregate`|The aggregate function used to compute the lag metric for 
scaling decisions. Possible values are `MAX`, `SUM` and `AVERAGE`. |No|`SUM`|
 
 The following example shows a supervisor spec with `lagBased` autoscaler:
 
diff --git 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
index 365a9135e3c..a142f414762 100644
--- 
a/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
+++ 
b/extensions-core/kinesis-indexing-service/src/main/java/org/apache/druid/indexing/kinesis/supervisor/KinesisSupervisor.java
@@ -427,13 +427,6 @@ public class KinesisSupervisor extends 
SeekableStreamSupervisor<String, String,
     );
   }
 
-  @Override
-  public long computeLagForAutoScaler()
-  {
-    LagStats lagStats = computeLagStats();
-    return lagStats == null ? 0L : lagStats.getMaxLag();
-  }
-
   private SeekableStreamDataSourceMetadata<String, String> 
createDataSourceMetadataWithClosedOrExpiredPartitions(
       SeekableStreamDataSourceMetadata<String, String> currentMetadata,
       Set<String> terminatedPartitionIds,
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
index f8618b06f74..ec81c5f9f99 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScaler.java
@@ -21,6 +21,8 @@ package 
org.apache.druid.indexing.seekablestream.supervisor.autoscaler;
 
 import org.apache.commons.collections4.queue.CircularFifoQueue;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
+import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.common.StringUtils;
@@ -154,8 +156,17 @@ public class LagBasedAutoScaler implements 
SupervisorTaskAutoScaler
       LOCK.lock();
       try {
         if (!spec.isSuspended()) {
-          long lag = supervisor.computeLagForAutoScaler();
-          lagMetricsQueue.offer(lag > 0 ? lag : 0L);
+          LagStats lagStats = supervisor.computeLagStats();
+
+          if (lagStats != null) {
+            AggregateFunction aggregate = 
lagBasedAutoScalerConfig.getLagAggregate() == null ?
+                                          lagStats.getAggregateForScaling() :
+                                          
lagBasedAutoScalerConfig.getLagAggregate();
+            long lag = lagStats.getMetric(aggregate);
+            lagMetricsQueue.offer(lag > 0 ? lag : 0L);
+          } else {
+            lagMetricsQueue.offer(0L);
+          }
           log.debug("Current lags for dataSource[%s] are [%s].", dataSource, 
lagMetricsQueue);
         } else {
           log.warn("[%s] supervisor is suspended, skipping lag collection", 
dataSource);
diff --git 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
index e03242de279..068e7cc4f87 100644
--- 
a/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
+++ 
b/indexing-service/src/main/java/org/apache/druid/indexing/seekablestream/supervisor/autoscaler/LagBasedAutoScalerConfig.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import org.apache.druid.indexing.overlord.supervisor.Supervisor;
 import org.apache.druid.indexing.overlord.supervisor.SupervisorSpec;
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
 import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.SupervisorTaskAutoScaler;
 import 
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor;
 import org.apache.druid.java.util.emitter.service.ServiceEmitter;
@@ -45,6 +46,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
   private final int scaleOutStep;
   private final boolean enableTaskAutoScaler;
   private final long minTriggerScaleActionFrequencyMillis;
+  private final AggregateFunction lagAggregate;
 
   @JsonCreator
   public LagBasedAutoScalerConfig(
@@ -61,7 +63,8 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
           @Nullable @JsonProperty("scaleInStep") Integer scaleInStep,
           @Nullable @JsonProperty("scaleOutStep") Integer scaleOutStep,
           @Nullable @JsonProperty("enableTaskAutoScaler") Boolean 
enableTaskAutoScaler,
-          @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long 
minTriggerScaleActionFrequencyMillis
+          @Nullable @JsonProperty("minTriggerScaleActionFrequencyMillis") Long 
minTriggerScaleActionFrequencyMillis,
+          @Nullable @JsonProperty("lagAggregate") AggregateFunction 
lagAggregate
   )
   {
     this.enableTaskAutoScaler = enableTaskAutoScaler != null ? 
enableTaskAutoScaler : false;
@@ -73,6 +76,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
     this.scaleInThreshold = scaleInThreshold != null ? scaleInThreshold : 
1000000;
     this.triggerScaleOutFractionThreshold = triggerScaleOutFractionThreshold 
!= null ? triggerScaleOutFractionThreshold : 0.3;
     this.triggerScaleInFractionThreshold = triggerScaleInFractionThreshold != 
null ? triggerScaleInFractionThreshold : 0.9;
+    this.lagAggregate = lagAggregate;
 
     // Only do taskCountMax and taskCountMin check when autoscaler is enabled. 
So that users left autoConfig empty{} will not throw any exception and 
autoscaler is disabled.
     // If autoscaler is disabled, no matter what configs are set, they are not 
used.
@@ -186,6 +190,13 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
     return minTriggerScaleActionFrequencyMillis;
   }
 
+  @JsonProperty
+  @Nullable
+  public AggregateFunction getLagAggregate()
+  {
+    return lagAggregate;
+  }
+
   @Override
   public String toString()
   {
@@ -204,6 +215,7 @@ public class LagBasedAutoScalerConfig implements 
AutoScalerConfig
             ", scaleActionPeriodMillis=" + scaleActionPeriodMillis +
             ", scaleInStep=" + scaleInStep +
             ", scaleOutStep=" + scaleOutStep +
+            ", lagAggregate=" + lagAggregate +
             '}';
   }
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
index b1fb439184d..bcfc5ebe819 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/Supervisor.java
@@ -92,14 +92,5 @@ public interface Supervisor
    */
   LagStats computeLagStats();
 
-  /**
-   * Used by AutoScaler to make scaling decisions.
-   */
-  default long computeLagForAutoScaler()
-  {
-    LagStats lagStats = computeLagStats();
-    return lagStats == null ? 0L : lagStats.getTotalLag();
-  }
-
   int getActiveTaskGroupsCount();
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java
similarity index 67%
copy from 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
copy to 
server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java
index 7b6e5fd0bab..247c05180be 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/AggregateFunction.java
@@ -19,31 +19,9 @@
 
 package org.apache.druid.indexing.overlord.supervisor.autoscaler;
 
-public class LagStats
+public enum AggregateFunction
 {
-  private final long maxLag;
-  private final long totalLag;
-  private final long avgLag;
-
-  public LagStats(long maxLag, long totalLag, long avgLag)
-  {
-    this.maxLag = maxLag;
-    this.totalLag = totalLag;
-    this.avgLag = avgLag;
-  }
-
-  public long getMaxLag()
-  {
-    return maxLag;
-  }
-
-  public long getTotalLag()
-  {
-    return totalLag;
-  }
-
-  public long getAvgLag()
-  {
-    return avgLag;
-  }
+  MAX,
+  SUM,
+  AVERAGE
 }
diff --git 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
index 7b6e5fd0bab..2240585680a 100644
--- 
a/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
+++ 
b/server/src/main/java/org/apache/druid/indexing/overlord/supervisor/autoscaler/LagStats.java
@@ -24,12 +24,19 @@ public class LagStats
   private final long maxLag;
   private final long totalLag;
   private final long avgLag;
+  private final AggregateFunction aggregateForScaling;
 
   public LagStats(long maxLag, long totalLag, long avgLag)
+  {
+    this(maxLag, totalLag, avgLag, AggregateFunction.SUM);
+  }
+
+  public LagStats(long maxLag, long totalLag, long avgLag, AggregateFunction 
aggregateForScaling)
   {
     this.maxLag = maxLag;
     this.totalLag = totalLag;
     this.avgLag = avgLag;
+    this.aggregateForScaling = aggregateForScaling == null ? 
AggregateFunction.SUM : aggregateForScaling;
   }
 
   public long getMaxLag()
@@ -46,4 +53,26 @@ public class LagStats
   {
     return avgLag;
   }
+
+  /**
+   * The preferred scaling metric that supervisor may specify to be used.
+   * This could be overrided by the autscaler.
+   */
+  public AggregateFunction getAggregateForScaling()
+  {
+    return aggregateForScaling;
+  }
+
+  public long getMetric(AggregateFunction metric)
+  {
+    switch (metric) {
+      case MAX:
+        return getMaxLag();
+      case SUM:
+        return getTotalLag();
+      case AVERAGE:
+        return getAvgLag();
+    }
+    throw new IllegalStateException("Unknown scale metric");
+  }
 }
diff --git 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
similarity index 69%
rename from 
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
rename to 
server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
index 79811079d34..5799d8c5e8e 100644
--- 
a/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/SupervisorTest.java
+++ 
b/server/src/test/java/org/apache/druid/indexing/overlord/supervisor/LagStatsTest.java
@@ -19,22 +19,21 @@
 
 package org.apache.druid.indexing.overlord.supervisor;
 
+import 
org.apache.druid.indexing.overlord.supervisor.autoscaler.AggregateFunction;
 import org.apache.druid.indexing.overlord.supervisor.autoscaler.LagStats;
 import org.junit.Assert;
 import org.junit.Test;
-import org.mockito.Mockito;
 
-public class SupervisorTest
+public class LagStatsTest
 {
   @Test
   public void testAutoScalerLagComputation()
   {
-    Supervisor supervisor = Mockito.spy(Supervisor.class);
+    LagStats lagStats = new LagStats(1, 2, 3);
 
-    Mockito.when(supervisor.computeLagStats()).thenReturn(new LagStats(1, 2, 
3));
-    Assert.assertEquals(2, supervisor.computeLagForAutoScaler());
-
-    Mockito.when(supervisor.computeLagStats()).thenReturn(null);
-    Assert.assertEquals(0, supervisor.computeLagForAutoScaler());
+    Assert.assertEquals(1, lagStats.getMetric(AggregateFunction.MAX));
+    Assert.assertEquals(2, lagStats.getMetric(AggregateFunction.SUM));
+    Assert.assertEquals(3, lagStats.getMetric(AggregateFunction.AVERAGE));
+    Assert.assertEquals(AggregateFunction.SUM, 
lagStats.getAggregateForScaling());
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@druid.apache.org
For additional commands, e-mail: commits-h...@druid.apache.org

Reply via email to