This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit 53d22d8a3e33f3971df43a29735471b7a8ccefaf Author: Matyas Orhidi <matyas_orh...@apple.com> AuthorDate: Fri Jan 20 16:48:29 2023 -0800 [FLINK-30653] Trigger resource Events on autoscaler actions --- .../operator/autoscaler/JobAutoScaler.java | 6 +- .../operator/autoscaler/JobVertexScaler.java | 31 ++++- .../operator/autoscaler/ScalingExecutor.java | 69 ++++++++--- .../kubernetes/operator/listener/AuditUtils.java | 4 +- .../AbstractFlinkResourceReconciler.java | 2 +- .../kubernetes/operator/utils/EventRecorder.java | 4 +- .../autoscaler/BacklogBasedScalingTest.java | 7 +- .../operator/autoscaler/JobVertexScalerTest.java | 133 +++++++++++++++++---- .../MetricsCollectionAndEvaluationTest.java | 7 +- .../operator/autoscaler/ScalingExecutorTest.java | 47 +++++++- .../kubernetes/operator/utils/EventCollector.java | 6 +- 11 files changed, 260 insertions(+), 56 deletions(-) diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java index d4d7f285..4f74a528 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobAutoScaler.java @@ -23,6 +23,7 @@ import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingM import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.controller.FlinkResourceContext; import org.apache.flink.kubernetes.operator.metrics.KubernetesResourceMetricGroup; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import io.fabric8.kubernetes.client.KubernetesClient; @@ -172,11 +173,12 @@ public class JobAutoScaler implements Cleanup { }); } - public static JobAutoScaler create(KubernetesClient kubernetesClient) { + public static JobAutoScaler create( + KubernetesClient kubernetesClient, EventRecorder eventRecorder) { return new JobAutoScaler( kubernetesClient, new RestApiMetricsCollector(), new ScalingMetricEvaluator(), - new ScalingExecutor(kubernetesClient)); + new ScalingExecutor(kubernetesClient, eventRecorder)); } } diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java index ea019080..31a0ed8d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScaler.java @@ -19,10 +19,12 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.utils.AutoScalerUtils; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -50,9 +52,20 @@ public class JobVertexScaler { private static final Logger LOG = LoggerFactory.getLogger(JobVertexScaler.class); + @VisibleForTesting + public static final String INNEFFECTIVE_MESSAGE_FORMAT = + "Skipping further scale up after ineffective previous scale up for %s"; + private Clock clock = Clock.system(ZoneId.systemDefault()); + private EventRecorder eventRecorder; + + public JobVertexScaler(EventRecorder eventRecorder) { + this.eventRecorder = eventRecorder; + } + public int computeScaleTargetParallelism( + AbstractFlinkResource<?, ?> resource, Configuration conf, JobVertexID vertex, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, @@ -99,6 +112,7 @@ public class JobVertexScaler { if (newParallelism == currentParallelism || blockScalingBasedOnPastActions( + resource, vertex, conf, evaluatedMetrics, @@ -115,6 +129,7 @@ public class JobVertexScaler { } private boolean blockScalingBasedOnPastActions( + AbstractFlinkResource<?, ?> resource, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, @@ -133,7 +148,8 @@ public class JobVertexScaler { if (currentParallelism == lastSummary.getNewParallelism() && lastSummary.isScaledUp()) { if (scaledUp) { - return detectIneffectiveScaleUp(vertex, conf, evaluatedMetrics, lastSummary); + return detectIneffectiveScaleUp( + resource, vertex, conf, evaluatedMetrics, lastSummary); } else { return detectImmediateScaleDownAfterScaleUp(vertex, conf, lastScalingTs); } @@ -156,6 +172,7 @@ public class JobVertexScaler { } private boolean detectIneffectiveScaleUp( + AbstractFlinkResource<?, ?> resource, JobVertexID vertex, Configuration conf, Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics, @@ -180,11 +197,17 @@ public class JobVertexScaler { return false; } - // TODO: Trigger kube event + var message = String.format(INNEFFECTIVE_MESSAGE_FORMAT, vertex); + + eventRecorder.triggerEvent( + resource, + EventRecorder.Type.Normal, + EventRecorder.Reason.IneffectiveScaling, + EventRecorder.Component.Operator, + message); if (conf.get(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED)) { - LOG.info( - "Skipping further scale up after ineffective previous scale up for {}", vertex); + LOG.info(message); return true; } else { return false; diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java index 98af3cd1..bda9dfe9 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.kubernetes.operator.utils.KubernetesClientUtils; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.util.Preconditions; @@ -60,20 +61,29 @@ public class ScalingExecutor implements Cleanup { "A parallelism override map (jobVertexId -> parallelism) which will be used to update" + " the parallelism of the corresponding job vertices of submitted JobGraphs."); + public static final String SCALING_SUMMARY_ENTRY = + " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f"; + public static final String SCALING_SUMMARY_HEADER_SCALING_DISABLED = + "Recommended parallelism change:"; + public static final String SCALING_SUMMARY_HEADER_SCALING_ENABLED = "Scaling vertices:"; private static final Logger LOG = LoggerFactory.getLogger(ScalingExecutor.class); private final KubernetesClient kubernetesClient; private final JobVertexScaler jobVertexScaler; - + private final EventRecorder eventRecorder; private Clock clock = Clock.system(ZoneId.systemDefault()); - public ScalingExecutor(KubernetesClient kubernetesClient) { - this(kubernetesClient, new JobVertexScaler()); + public ScalingExecutor(KubernetesClient kubernetesClient, EventRecorder eventRecorder) { + this(kubernetesClient, new JobVertexScaler(eventRecorder), eventRecorder); } - public ScalingExecutor(KubernetesClient kubernetesClient, JobVertexScaler jobVertexScaler) { + public ScalingExecutor( + KubernetesClient kubernetesClient, + JobVertexScaler jobVertexScaler, + EventRecorder eventRecorder) { this.kubernetesClient = kubernetesClient; this.jobVertexScaler = jobVertexScaler; + this.eventRecorder = eventRecorder; } public boolean scaleResource( @@ -87,7 +97,9 @@ public class ScalingExecutor implements Cleanup { } var scalingHistory = scalingInformation.getScalingHistory(); - var scalingSummaries = computeScalingSummary(conf, evaluatedMetrics, scalingHistory); + var scalingSummaries = + computeScalingSummary(resource, conf, evaluatedMetrics, scalingHistory); + if (scalingSummaries.isEmpty()) { LOG.info("All job vertices are currently running at their target parallelism."); return false; @@ -97,22 +109,20 @@ public class ScalingExecutor implements Cleanup { return false; } - if (!conf.get(SCALING_ENABLED)) { + var scalingEnabled = conf.get(SCALING_ENABLED); + + var scalingReport = scalingReport(scalingSummaries, scalingEnabled); + eventRecorder.triggerEvent( + resource, + EventRecorder.Type.Normal, + EventRecorder.Reason.ScalingReport, + EventRecorder.Component.Operator, + scalingReport); + + if (!scalingEnabled) { return false; } - LOG.info("Scaling vertices:"); - scalingSummaries.forEach( - (v, s) -> - LOG.info( - "{} | Parallelism {} -> {} | Processing capacity {} -> {} | Target data rate {}", - v, - s.getCurrentParallelism(), - s.getNewParallelism(), - s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(), - s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(), - s.getMetrics().get(TARGET_DATA_RATE).getAverage())); - setVertexParallelismOverrides(resource, evaluatedMetrics, scalingSummaries); KubernetesClientUtils.applyToStoredCr( kubernetesClient, @@ -126,6 +136,27 @@ public class ScalingExecutor implements Cleanup { return true; } + private static String scalingReport( + Map<JobVertexID, ScalingSummary> scalingSummaries, boolean scalingEnabled) { + StringBuilder sb = + new StringBuilder( + scalingEnabled + ? SCALING_SUMMARY_HEADER_SCALING_ENABLED + : SCALING_SUMMARY_HEADER_SCALING_DISABLED); + scalingSummaries.forEach( + (v, s) -> + sb.append( + String.format( + SCALING_SUMMARY_ENTRY, + v, + s.getCurrentParallelism(), + s.getNewParallelism(), + s.getMetrics().get(TRUE_PROCESSING_RATE).getAverage(), + s.getMetrics().get(EXPECTED_PROCESSING_RATE).getCurrent(), + s.getMetrics().get(TARGET_DATA_RATE).getAverage()))); + return sb.toString(); + } + private boolean stabilizationPeriodPassed( AbstractFlinkResource<?, ?> resource, Configuration conf) { var jobStatus = resource.getStatus().getJobStatus(); @@ -185,6 +216,7 @@ public class ScalingExecutor implements Cleanup { } private Map<JobVertexID, ScalingSummary> computeScalingSummary( + AbstractFlinkResource<?, ?> resource, Configuration conf, Map<JobVertexID, Map<ScalingMetric, EvaluatedScalingMetric>> evaluatedMetrics, Map<JobVertexID, SortedMap<Instant, ScalingSummary>> scalingHistory) { @@ -196,6 +228,7 @@ public class ScalingExecutor implements Cleanup { (int) metrics.get(ScalingMetric.PARALLELISM).getCurrent(); var newParallelism = jobVertexScaler.computeScaleTargetParallelism( + resource, conf, v, metrics, diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java index a0a85254..e590580e 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/listener/AuditUtils.java @@ -17,6 +17,7 @@ package org.apache.flink.kubernetes.operator.listener; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; import org.apache.flink.kubernetes.operator.api.listener.FlinkResourceListener; import org.apache.flink.kubernetes.operator.api.status.CommonStatus; @@ -52,7 +53,8 @@ public class AuditUtils { : status.getError()); } - private static String format(@NonNull Event event) { + @VisibleForTesting + public static String format(@NonNull Event event) { return String.format( ">>> Event | %-7s | %-15s | %s", event.getType().equals("Normal") ? "Info" : event.getType(), diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java index 249d2f80..79e5e72d 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/reconciler/deployment/AbstractFlinkResourceReconciler.java @@ -90,7 +90,7 @@ public abstract class AbstractFlinkResourceReconciler< this.kubernetesClient = kubernetesClient; this.eventRecorder = eventRecorder; this.statusRecorder = statusRecorder; - this.resourceScaler = JobAutoScaler.create(kubernetesClient); + this.resourceScaler = JobAutoScaler.create(kubernetesClient, eventRecorder); } @Override diff --git a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java index 81baf9b6..d3a3a51c 100644 --- a/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java +++ b/flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/utils/EventRecorder.java @@ -128,6 +128,8 @@ public class EventRecorder { Missing, ValidationError, RecoverDeployment, - RestartUnhealthyJob + RestartUnhealthyJob, + ScalingReport, + IneffectiveScaling } } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java index af8662b6..b0e48dd3 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/BacklogBasedScalingTest.java @@ -29,6 +29,8 @@ import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; @@ -69,7 +71,10 @@ public class BacklogBasedScalingTest extends OperatorTestBase { @BeforeEach public void setup() { evaluator = new ScalingMetricEvaluator(); - scalingExecutor = new ScalingExecutor(kubernetesClient); + scalingExecutor = + new ScalingExecutor( + kubernetesClient, + new EventRecorder(kubernetesClient, new EventCollector())); app = TestUtils.buildApplicationCluster(); app.getMetadata().setGeneration(1L); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java index 34c2b8cc..6e933d1c 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/JobVertexScalerTest.java @@ -18,11 +18,17 @@ package org.apache.flink.kubernetes.operator.autoscaler; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.operator.TestUtils; +import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; +import io.fabric8.kubernetes.client.KubernetesClient; +import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.junit.Assert; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -35,19 +41,29 @@ import java.util.HashMap; import java.util.Map; import java.util.TreeMap; +import static org.apache.flink.kubernetes.operator.autoscaler.JobVertexScaler.INNEFFECTIVE_MESSAGE_FORMAT; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; /** Test for vertex parallelism scaler logic. */ +@EnableKubernetesMockClient(crud = true) public class JobVertexScalerTest { private JobVertexScaler vertexScaler; private Configuration conf; + private KubernetesClient kubernetesClient; + private EventCollector eventCollector; + + private FlinkDeployment flinkDep; + @BeforeEach public void setup() { - vertexScaler = new JobVertexScaler(); + flinkDep = TestUtils.buildApplicationCluster(); + kubernetesClient.resource(flinkDep).createOrReplace(); + eventCollector = new EventCollector(); + vertexScaler = new JobVertexScaler(new EventRecorder(kubernetesClient, eventCollector)); conf = new Configuration(); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 1.); conf.set(AutoScalerOptions.CATCH_UP_DURATION, Duration.ZERO); @@ -60,55 +76,55 @@ public class JobVertexScalerTest { assertEquals( 5, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 8, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 50, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 80, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 80, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8); assertEquals( 8, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 60, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 60, 100), Collections.emptySortedMap())); assertEquals( 8, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 59, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 59, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.5); assertEquals( 10, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(2, 100, 40), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(2, 100, 40), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 0.6); assertEquals( 4, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(2, 100, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(2, 100, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.5); assertEquals( 5, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); conf.set(AutoScalerOptions.MAX_SCALE_DOWN_FACTOR, 0.6); assertEquals( 4, vertexScaler.computeScaleTargetParallelism( - conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); + flinkDep, conf, op, evaluated(10, 10, 100), Collections.emptySortedMap())); } @Test @@ -152,6 +168,7 @@ public class JobVertexScalerTest { assertEquals( 5, vertexScaler.computeScaleTargetParallelism( + flinkDep, conf, new JobVertexID(), evaluated(10, 100, 500), @@ -165,6 +182,7 @@ public class JobVertexScalerTest { assertEquals( 10, vertexScaler.computeScaleTargetParallelism( + flinkDep, conf, new JobVertexID(), evaluated(10, 500, 100), @@ -181,24 +199,32 @@ public class JobVertexScalerTest { var evaluated = evaluated(5, 100, 50); var history = new TreeMap<Instant, ScalingSummary>(); - assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 10, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); // Should not allow scale back down immediately evaluated = evaluated(10, 50, 100); - assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 10, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); // Pass some time... clock = Clock.offset(Clock.systemDefaultZone(), Duration.ofSeconds(61)); vertexScaler.setClock(clock); - assertEquals(5, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 5, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); history.put(clock.instant(), new ScalingSummary(10, 5, evaluated)); // Allow immediate scale up evaluated = evaluated(5, 100, 50); - assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 10, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); history.put(clock.instant(), new ScalingSummary(5, 10, evaluated)); } @@ -210,58 +236,115 @@ public class JobVertexScalerTest { var evaluated = evaluated(5, 100, 50); var history = new TreeMap<Instant, ScalingSummary>(); - assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 10, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); // Allow to scale higher if scaling was effective (80%) evaluated = evaluated(10, 180, 90); - assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); // Detect ineffective scaling, less than 5% of target increase (instead of 90 -> 180, only // 90 -> 94. Do not try to scale above 20 evaluated = evaluated(20, 180, 94); - assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Still considered ineffective (less than <10%) evaluated = evaluated(20, 180, 98); - assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Allow scale up if current parallelism doesnt match last (user rescaled manually) evaluated = evaluated(10, 180, 90); - assertEquals(20, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); // Over 10%, effective evaluated = evaluated(20, 180, 100); - assertEquals(36, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 36, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); // Ineffective but detection is turned off conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, false); evaluated = evaluated(20, 180, 90); - assertEquals(40, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 40, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, true); // Allow scale down even if ineffective evaluated = evaluated(20, 45, 90); - assertEquals(10, vertexScaler.computeScaleTargetParallelism(conf, op, evaluated, history)); + assertEquals( + 10, + vertexScaler.computeScaleTargetParallelism(flinkDep, conf, op, evaluated, history)); assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); } + @Test + public void testSendingIneffectiveScalingEvents() { + var jobVertexID = new JobVertexID(); + conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0); + conf.set(AutoScalerOptions.SCALE_UP_GRACE_PERIOD, Duration.ZERO); + + var evaluated = evaluated(5, 100, 50); + var history = new TreeMap<Instant, ScalingSummary>(); + assertEquals( + 10, + vertexScaler.computeScaleTargetParallelism( + flinkDep, conf, jobVertexID, evaluated, history)); + assertEquals(100, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); + history.put(Instant.now(), new ScalingSummary(5, 10, evaluated)); + + // Effective scale, no events triggered + evaluated = evaluated(10, 180, 90); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism( + flinkDep, conf, jobVertexID, evaluated, history)); + assertEquals(180, evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent()); + history.put(Instant.now(), new ScalingSummary(10, 20, evaluated)); + assertEquals(0, eventCollector.events.size()); + + // Ineffective scale, an event is triggered + evaluated = evaluated(20, 180, 95); + assertEquals( + 20, + vertexScaler.computeScaleTargetParallelism( + flinkDep, conf, jobVertexID, evaluated, history)); + assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE)); + assertEquals(1, eventCollector.events.size()); + var event = eventCollector.events.poll(); + assertEquals(String.format(INNEFFECTIVE_MESSAGE_FORMAT, jobVertexID), event.getMessage()); + assertEquals(EventRecorder.Reason.IneffectiveScaling.name(), event.getReason()); + } + private Map<ScalingMetric, EvaluatedScalingMetric> evaluated( - int parallelism, double target, double procRate) { + int parallelism, double targetDataRate, double trueProcessingRate) { var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>(); metrics.put(ScalingMetric.PARALLELISM, EvaluatedScalingMetric.of(parallelism)); metrics.put(ScalingMetric.MAX_PARALLELISM, EvaluatedScalingMetric.of(720)); - metrics.put(ScalingMetric.TARGET_DATA_RATE, new EvaluatedScalingMetric(target, target)); + metrics.put( + ScalingMetric.TARGET_DATA_RATE, + new EvaluatedScalingMetric(targetDataRate, targetDataRate)); metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, EvaluatedScalingMetric.of(0.)); metrics.put( - ScalingMetric.TRUE_PROCESSING_RATE, new EvaluatedScalingMetric(procRate, procRate)); + ScalingMetric.TRUE_PROCESSING_RATE, + new EvaluatedScalingMetric(trueProcessingRate, trueProcessingRate)); ScalingMetricEvaluator.computeProcessingRateThresholds(metrics, conf); return metrics; } diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java index d12e6bc3..b42137dd 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/MetricsCollectionAndEvaluationTest.java @@ -29,6 +29,8 @@ import org.apache.flink.kubernetes.operator.autoscaler.topology.JobTopology; import org.apache.flink.kubernetes.operator.autoscaler.topology.VertexInfo; import org.apache.flink.kubernetes.operator.config.FlinkConfigManager; import org.apache.flink.kubernetes.operator.reconciler.ReconciliationUtils; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.rest.messages.job.JobDetailsInfo; import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; @@ -81,7 +83,10 @@ public class MetricsCollectionAndEvaluationTest { @BeforeEach public void setup() { evaluator = new ScalingMetricEvaluator(); - scalingExecutor = new ScalingExecutor(kubernetesClient); + scalingExecutor = + new ScalingExecutor( + kubernetesClient, + new EventRecorder(kubernetesClient, new EventCollector())); service = new TestingFlinkService(); app = TestUtils.buildApplicationCluster(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java index 7f9fd458..447c262d 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/autoscaler/ScalingExecutorTest.java @@ -25,12 +25,16 @@ import org.apache.flink.kubernetes.operator.api.FlinkDeployment; import org.apache.flink.kubernetes.operator.autoscaler.config.AutoScalerOptions; import org.apache.flink.kubernetes.operator.autoscaler.metrics.EvaluatedScalingMetric; import org.apache.flink.kubernetes.operator.autoscaler.metrics.ScalingMetric; +import org.apache.flink.kubernetes.operator.utils.EventCollector; +import org.apache.flink.kubernetes.operator.utils.EventRecorder; import org.apache.flink.runtime.jobgraph.JobVertexID; import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.ValueSource; import java.time.Clock; import java.time.Duration; @@ -40,6 +44,10 @@ import java.util.HashMap; import java.util.Map; import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.PARALLELISM_OVERRIDES; +import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_ENTRY; +import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_DISABLED; +import static org.apache.flink.kubernetes.operator.autoscaler.ScalingExecutor.SCALING_SUMMARY_HEADER_SCALING_ENABLED; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -49,13 +57,18 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ScalingExecutorTest { private ScalingExecutor scalingDecisionExecutor; + + private EventCollector eventCollector; private Configuration conf; private KubernetesClient kubernetesClient; private FlinkDeployment flinkDep; @BeforeEach public void setup() { - scalingDecisionExecutor = new ScalingExecutor(kubernetesClient); + eventCollector = new EventCollector(); + scalingDecisionExecutor = + new ScalingExecutor( + kubernetesClient, new EventRecorder(kubernetesClient, eventCollector)); conf = new Configuration(); conf.set(AutoScalerOptions.STABILIZATION_INTERVAL, Duration.ZERO); conf.set(AutoScalerOptions.SCALING_ENABLED, true); @@ -170,6 +183,38 @@ public class ScalingExecutorTest { assertFalse(ScalingExecutor.allVerticesWithinUtilizationTarget(evaluated, scalingSummary)); } + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void testScalingEvents(boolean scalingEnabled) { + var jobVertexID = new JobVertexID(); + conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled); + var metrics = Map.of(jobVertexID, evaluated(1, 110, 100)); + var scalingInfo = new AutoScalerInfo(new HashMap<>()); + assertEquals( + scalingEnabled, + scalingDecisionExecutor.scaleResource(flinkDep, scalingInfo, conf, metrics)); + assertEquals(1, eventCollector.events.size()); + var event = eventCollector.events.poll(); + assertTrue( + event.getMessage() + .contains( + String.format( + SCALING_SUMMARY_ENTRY, + jobVertexID, + 1, + 2, + 100.0, + 157.0, + 110.0))); + assertTrue( + event.getMessage() + .contains( + scalingEnabled + ? SCALING_SUMMARY_HEADER_SCALING_ENABLED + : SCALING_SUMMARY_HEADER_SCALING_DISABLED)); + assertEquals(EventRecorder.Reason.ScalingReport.name(), event.getReason()); + } + private Map<ScalingMetric, EvaluatedScalingMetric> evaluated( int parallelism, double target, double procRate, double catchupRate) { var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>(); diff --git a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java index 76791dcf..762b0376 100644 --- a/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java +++ b/flink-kubernetes-operator/src/test/java/org/apache/flink/kubernetes/operator/utils/EventCollector.java @@ -19,8 +19,11 @@ package org.apache.flink.kubernetes.operator.utils; import org.apache.flink.kubernetes.operator.api.AbstractFlinkResource; +import org.apache.flink.kubernetes.operator.listener.AuditUtils; import io.fabric8.kubernetes.api.model.Event; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.Queue; @@ -28,11 +31,12 @@ import java.util.function.BiConsumer; /** Simple consumer that collects triggered events for tests. */ public class EventCollector implements BiConsumer<AbstractFlinkResource<?, ?>, Event> { - + private static final Logger LOG = LoggerFactory.getLogger(EventCollector.class); public final Queue<Event> events = new LinkedList<>(); @Override public void accept(AbstractFlinkResource<?, ?> abstractFlinkResource, Event event) { + LOG.info(AuditUtils.format(event)); events.add(event); } }