This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new 66808eb9 [FLINK-34104][autoscaler] Improve the ScalingReport format of 
autoscaling
66808eb9 is described below

commit 66808eb9f2d7d26dcd755fec7fdab5ccaf75f8bc
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Wed Jan 17 12:01:49 2024 +0800

    [FLINK-34104][autoscaler] Improve the ScalingReport format of autoscaling
---
 .../autoscaler/event/AutoScalerEventHandler.java   |   2 +-
 .../autoscaler/event/AutoscalerEventUtils.java     |  69 +++++++++++++
 .../autoscaler/event/VertexScalingReport.java      |  44 ++++++++
 .../event/AutoScalerEventHandlerTest.java          | 113 +++++++++++++++++++++
 4 files changed, 227 insertions(+), 1 deletion(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
index ed818a89..5695d109 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoScalerEventHandler.java
@@ -40,7 +40,7 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_
 @Experimental
 public interface AutoScalerEventHandler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     String SCALING_SUMMARY_ENTRY =
-            " Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f 
-> %.2f | Target data rate %.2f";
+            "{ Vertex ID %s | Parallelism %d -> %d | Processing capacity %.2f 
-> %.2f | Target data rate %.2f}";
     String SCALING_EXECUTION_DISABLED_REASON = "%s:%s, recommended parallelism 
change:";
     String SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED =
             "Scaling execution disabled by config ";
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
new file mode 100644
index 00000000..1628ebfe
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/AutoscalerEventUtils.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/** The utils of {@link AutoScalerEventHandler}. */
+@Experimental
+public class AutoscalerEventUtils {
+
+    private static final Pattern SCALING_REPORT_SEPARATOR = 
Pattern.compile("\\{(.+?)\\}");
+    private static final Pattern VERTEX_SCALING_REPORT_PATTERN =
+            Pattern.compile(
+                    "Vertex ID (.*?) \\| Parallelism (.*?) -> (.*?) \\| 
Processing capacity (.*?) -> (.*?) \\| Target data rate (.*)");
+
+    /** Parse the scaling report from original scaling report event. */
+    public static List<VertexScalingReport> parseVertexScalingReports(String 
scalingReport) {
+        final List<String> originalVertexScalingReports =
+                extractOriginalVertexScalingReports(scalingReport);
+        return originalVertexScalingReports.stream()
+                .map(AutoscalerEventUtils::extractVertexScalingReport)
+                .collect(Collectors.toList());
+    }
+
+    private static List<String> extractOriginalVertexScalingReports(String 
scalingReport) {
+        var result = new ArrayList<String>();
+        var m = SCALING_REPORT_SEPARATOR.matcher(scalingReport);
+
+        while (m.find()) {
+            result.add(m.group(1));
+        }
+        return result;
+    }
+
+    private static VertexScalingReport extractVertexScalingReport(String 
vertexScalingReportStr) {
+        final var vertexScalingReport = new VertexScalingReport();
+        var m = VERTEX_SCALING_REPORT_PATTERN.matcher(vertexScalingReportStr);
+
+        if (m.find()) {
+            vertexScalingReport.setVertexId(m.group(1));
+            
vertexScalingReport.setCurrentParallelism(Integer.parseInt(m.group(2)));
+            
vertexScalingReport.setNewParallelism(Integer.parseInt(m.group(3)));
+            
vertexScalingReport.setCurrentProcessCapacity(Double.parseDouble(m.group(4)));
+            
vertexScalingReport.setExpectedProcessCapacity(Double.parseDouble(m.group(5)));
+            
vertexScalingReport.setTargetDataRate(Double.parseDouble(m.group(6)));
+        }
+        return vertexScalingReport;
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java
new file mode 100644
index 00000000..e397c643
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/event/VertexScalingReport.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.annotation.Experimental;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/** The scaling report of single vertex. */
+@Experimental
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class VertexScalingReport {
+
+    private String vertexId;
+
+    private int currentParallelism;
+
+    private int newParallelism;
+
+    private double currentProcessCapacity;
+
+    private double expectedProcessCapacity;
+
+    private double targetDataRate;
+}
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
new file mode 100644
index 00000000..8a52925c
--- /dev/null
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/event/AutoScalerEventHandlerTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.event;
+
+import org.apache.flink.autoscaler.ScalingSummary;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESSING_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TARGET_DATA_RATE;
+import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link AutoScalerEventHandler}. */
+class AutoScalerEventHandlerTest {
+
+    @Test
+    void testScalingReport() {
+        var expectedJson =
+                "Scaling execution enabled, begin scaling vertices:"
+                        + "{ Vertex ID ea632d67b7d595e5b851708ae9ad79d6 | 
Parallelism 3 -> 1 | "
+                        + "Processing capacity 424.68 -> 123.40 | Target data 
rate 403.67}"
+                        + "{ Vertex ID bc764cd8ddf7a0cff126f51c16239658 | 
Parallelism 4 -> 2 | "
+                        + "Processing capacity Infinity -> Infinity | Target 
data rate 812.58}"
+                        + "{ Vertex ID 0a448493b4782967b150582570326227 | 
Parallelism 5 -> 8 | "
+                        + "Processing capacity 404.73 -> 645.00 | Target data 
rate 404.27}";
+
+        final var scalingSummaries = buildScalingSummaries();
+        assertThat(
+                        AutoScalerEventHandler.scalingReport(
+                                scalingSummaries,
+                                AutoScalerEventHandler
+                                        
.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED))
+                .isEqualTo(expectedJson);
+
+        
assertThat(AutoscalerEventUtils.parseVertexScalingReports(expectedJson))
+                
.containsExactlyInAnyOrderElementsOf(buildExpectedScalingResults());
+    }
+
+    private HashMap<JobVertexID, ScalingSummary> buildScalingSummaries() {
+        final var jobVertex1 = 
JobVertexID.fromHexString("0a448493b4782967b150582570326227");
+        final var jobVertex2 = 
JobVertexID.fromHexString("bc764cd8ddf7a0cff126f51c16239658");
+        final var jobVertex3 = 
JobVertexID.fromHexString("ea632d67b7d595e5b851708ae9ad79d6");
+
+        final var scalingSummaries = new HashMap<JobVertexID, 
ScalingSummary>();
+        scalingSummaries.put(
+                jobVertex2,
+                generateScalingSummary(
+                        4, 2, Double.POSITIVE_INFINITY, 
Double.POSITIVE_INFINITY, 812.583));
+        scalingSummaries.put(jobVertex3, generateScalingSummary(3, 1, 424.678, 
123.4, 403.673));
+        scalingSummaries.put(jobVertex1, generateScalingSummary(5, 8, 404.727, 
645.0, 404.268));
+
+        return scalingSummaries;
+    }
+
+    private List<VertexScalingReport> buildExpectedScalingResults() {
+        return List.of(
+                new VertexScalingReport(
+                        "0a448493b4782967b150582570326227", 5, 8, 404.73, 
645.0, 404.27),
+                new VertexScalingReport(
+                        "bc764cd8ddf7a0cff126f51c16239658",
+                        4,
+                        2,
+                        Double.POSITIVE_INFINITY,
+                        Double.POSITIVE_INFINITY,
+                        812.58),
+                new VertexScalingReport(
+                        "ea632d67b7d595e5b851708ae9ad79d6", 3, 1, 424.68, 
123.4, 403.67));
+    }
+
+    private ScalingSummary generateScalingSummary(
+            int currentParallelism,
+            int newParallelism,
+            double currentProcessCapacity,
+            double expectedProcessCapacity,
+            double targetDataRate) {
+        final var scalingSummary = new ScalingSummary();
+        scalingSummary.setCurrentParallelism(currentParallelism);
+        scalingSummary.setNewParallelism(newParallelism);
+
+        final var metrics =
+                Map.of(
+                        TRUE_PROCESSING_RATE,
+                        new EvaluatedScalingMetric(400, 
currentProcessCapacity),
+                        EXPECTED_PROCESSING_RATE,
+                        new EvaluatedScalingMetric(expectedProcessCapacity, 
400),
+                        TARGET_DATA_RATE,
+                        new EvaluatedScalingMetric(400, targetDataRate));
+        scalingSummary.setMetrics(metrics);
+        return scalingSummary;
+    }
+}

Reply via email to