This is an automated email from the ASF dual-hosted git repository. fanrui pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
The following commit(s) were added to refs/heads/main by this push: new 66808eb9 [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling 66808eb9 is described below commit 66808eb9f2d7d26dcd755fec7fdab5ccaf75f8bc Author: Rui Fan <1996fan...@gmail.com> AuthorDate: Wed Jan 17 12:01:49 2024 +0800 [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling --- .../autoscaler/event/AutoScalerEventHandler.java | 2 +- .../autoscaler/event/AutoscalerEventUtils.java | 69 +++++++++++++ .../autoscaler/event/VertexScalingReport.java | 44 ++++++++ .../event/AutoScalerEventHandlerTest.java | 113 +++++++++++++++++++++ 4 files changed, 227 insertions(+), 1 deletion(-) diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java index ed818a89..5695d109 100644 --- a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java @@ -40,7 +40,7 @@ import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_ @Experimental public interface AutoScalerEventHandler<KEY, Context extends JobAutoScalerContext<KEY>> { String SCALING_SUMMARY_ENTRY = - " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f"; + "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f -> %.2f | Target data rate %.2f}"; String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism change:"; String SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED = "Scaling execution disabled by config "; diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java new file mode 100644 index 00000000..1628ebfe --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import java.util.ArrayList; +import java.util.List; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + +/** The utils of {@link AutoScalerEventHandler}. */ +@Experimental +public class AutoscalerEventUtils { + + private static final Pattern SCALING_REPORT_SEPARATOR = Pattern.compile("\\{(.+?)\\}"); + private static final Pattern VERTEX_SCALING_REPORT_PATTERN = + Pattern.compile( + "Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)"); + + /** Parse the scaling report from original scaling report event. */ + public static List<VertexScalingReport> parseVertexScalingReports(String scalingReport) { + final List<String> originalVertexScalingReports = + extractOriginalVertexScalingReports(scalingReport); + return originalVertexScalingReports.stream() + .map(AutoscalerEventUtils::extractVertexScalingReport) + .collect(Collectors.toList()); + } + + private static List<String> extractOriginalVertexScalingReports(String scalingReport) { + var result = new ArrayList<String>(); + var m = SCALING_REPORT_SEPARATOR.matcher(scalingReport); + + while (m.find()) { + result.add(m.group(1)); + } + return result; + } + + private static VertexScalingReport extractVertexScalingReport(String vertexScalingReportStr) { + final var vertexScalingReport = new VertexScalingReport(); + var m = VERTEX_SCALING_REPORT_PATTERN.matcher(vertexScalingReportStr); + + if (m.find()) { + vertexScalingReport.setVertexId(m.group(1)); + vertexScalingReport.setCurrentParallelism(Integer.parseInt(m.group(2))); + vertexScalingReport.setNewParallelism(Integer.parseInt(m.group(3))); + vertexScalingReport.setCurrentProcessCapacity(Double.parseDouble(m.group(4))); + vertexScalingReport.setExpectedProcessCapacity(Double.parseDouble(m.group(5))); + vertexScalingReport.setTargetDataRate(Double.parseDouble(m.group(6))); + } + return vertexScalingReport; + } +} diff --git a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java new file mode 100644 index 00000000..e397c643 --- /dev/null +++ b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.annotation.Experimental; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +/** The scaling report of single vertex. */ +@Experimental +@Data +@NoArgsConstructor +@AllArgsConstructor +public class VertexScalingReport { + + private String vertexId; + + private int currentParallelism; + + private int newParallelism; + + private double currentProcessCapacity; + + private double expectedProcessCapacity; + + private double targetDataRate; +} diff --git a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java new file mode 100644 index 00000000..8a52925c --- /dev/null +++ b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.autoscaler.event; + +import org.apache.flink.autoscaler.ScalingSummary; +import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE; +import static org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE; +import static org.assertj.core.api.Assertions.assertThat; + +/** Test for {@link AutoScalerEventHandler}. */ +class AutoScalerEventHandlerTest { + + @Test + void testScalingReport() { + var expectedJson = + "Scaling execution enabled, begin scaling vertices:" + + "{ Vertex ID ea632d67b7d595e5b851708ae9ad79d6 | Parallelism 3 -> 1 | " + + "Processing capacity 424.68 -> 123.40 | Target data rate 403.67}" + + "{ Vertex ID bc764cd8ddf7a0cff126f51c16239658 | Parallelism 4 -> 2 | " + + "Processing capacity Infinity -> Infinity | Target data rate 812.58}" + + "{ Vertex ID 0a448493b4782967b150582570326227 | Parallelism 5 -> 8 | " + + "Processing capacity 404.73 -> 645.00 | Target data rate 404.27}"; + + final var scalingSummaries = buildScalingSummaries(); + assertThat( + AutoScalerEventHandler.scalingReport( + scalingSummaries, + AutoScalerEventHandler + .SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED)) + .isEqualTo(expectedJson); + + assertThat(AutoscalerEventUtils.parseVertexScalingReports(expectedJson)) + .containsExactlyInAnyOrderElementsOf(buildExpectedScalingResults()); + } + + private HashMap<JobVertexID, ScalingSummary> buildScalingSummaries() { + final var jobVertex1 = JobVertexID.fromHexString("0a448493b4782967b150582570326227"); + final var jobVertex2 = JobVertexID.fromHexString("bc764cd8ddf7a0cff126f51c16239658"); + final var jobVertex3 = JobVertexID.fromHexString("ea632d67b7d595e5b851708ae9ad79d6"); + + final var scalingSummaries = new HashMap<JobVertexID, ScalingSummary>(); + scalingSummaries.put( + jobVertex2, + generateScalingSummary( + 4, 2, Double.POSITIVE_INFINITY, Double.POSITIVE_INFINITY, 812.583)); + scalingSummaries.put(jobVertex3, generateScalingSummary(3, 1, 424.678, 123.4, 403.673)); + scalingSummaries.put(jobVertex1, generateScalingSummary(5, 8, 404.727, 645.0, 404.268)); + + return scalingSummaries; + } + + private List<VertexScalingReport> buildExpectedScalingResults() { + return List.of( + new VertexScalingReport( + "0a448493b4782967b150582570326227", 5, 8, 404.73, 645.0, 404.27), + new VertexScalingReport( + "bc764cd8ddf7a0cff126f51c16239658", + 4, + 2, + Double.POSITIVE_INFINITY, + Double.POSITIVE_INFINITY, + 812.58), + new VertexScalingReport( + "ea632d67b7d595e5b851708ae9ad79d6", 3, 1, 424.68, 123.4, 403.67)); + } + + private ScalingSummary generateScalingSummary( + int currentParallelism, + int newParallelism, + double currentProcessCapacity, + double expectedProcessCapacity, + double targetDataRate) { + final var scalingSummary = new ScalingSummary(); + scalingSummary.setCurrentParallelism(currentParallelism); + scalingSummary.setNewParallelism(newParallelism); + + final var metrics = + Map.of( + TRUE_PROCESSING_RATE, + new EvaluatedScalingMetric(400, currentProcessCapacity), + EXPECTED_PROCESSING_RATE, + new EvaluatedScalingMetric(expectedProcessCapacity, 400), + TARGET_DATA_RATE, + new EvaluatedScalingMetric(400, targetDataRate)); + scalingSummary.setMetrics(metrics); + return scalingSummary; + } +}