This is an automated email from the ASF dual-hosted git repository.

fanrui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git


The following commit(s) were added to refs/heads/main by this push:
     new f4b04ec5 [FLINK-34504][autoscaler] Avoid the parallelism adjustment 
when the upstream shuffle type doesn't have keyBy (#783)
f4b04ec5 is described below

commit f4b04ec5f47358829ee279806d2f74b1f1641ee2
Author: Rui Fan <1996fan...@gmail.com>
AuthorDate: Mon Feb 26 21:51:54 2024 +0800

    [FLINK-34504][autoscaler] Avoid the parallelism adjustment when the 
upstream shuffle type doesn't have keyBy (#783)
---
 .../apache/flink/autoscaler/JobVertexScaler.java   |  65 ++++---
 .../apache/flink/autoscaler/ScalingExecutor.java   |   8 +-
 .../flink/autoscaler/topology/JobTopology.java     |   6 +-
 .../flink/autoscaler/topology/ShipStrategy.java    |  55 ++++++
 .../flink/autoscaler/topology/VertexInfo.java      |  13 +-
 .../flink/autoscaler/tuning/MemoryTuning.java      |  21 ++-
 .../flink/autoscaler/BacklogBasedScalingTest.java  |   9 +-
 .../flink/autoscaler/JobVertexScalerTest.java      | 204 +++++++++++++++++----
 .../MetricsCollectionAndEvaluationTest.java        |  11 +-
 .../autoscaler/RecommendedParallelismTest.java     |   5 +-
 .../flink/autoscaler/ScalingExecutorTest.java      | 130 +++++++++++--
 .../autoscaler/ScalingMetricCollectorTest.java     |  19 +-
 .../autoscaler/ScalingMetricEvaluatorTest.java     |  16 +-
 .../autoscaler/metrics/ScalingMetricsTest.java     |   5 +-
 .../flink/autoscaler/topology/JobTopologyTest.java |  16 +-
 .../flink/autoscaler/tuning/MemoryTuningTest.java  |  27 ++-
 16 files changed, 459 insertions(+), 151 deletions(-)

diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
index 01f6d940..d0acd1be 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/JobVertexScaler.java
@@ -22,6 +22,7 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.AutoScalerEventHandler;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
 import org.apache.flink.autoscaler.utils.AutoScalerUtils;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -34,6 +35,7 @@ import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
 import java.time.ZoneId;
+import java.util.Collection;
 import java.util.Map;
 import java.util.SortedMap;
 
@@ -48,6 +50,7 @@ import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.EXPECTED_PROCESS
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MAX_PARALLELISM;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.TRUE_PROCESSING_RATE;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
 
 /** Component responsible for computing vertex parallelism based on the 
scaling metrics. */
 public class JobVertexScaler<KEY, Context extends JobAutoScalerContext<KEY>> {
@@ -71,6 +74,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
     public int computeScaleTargetParallelism(
             Context context,
             JobVertexID vertex,
+            Collection<ShipStrategy> inputShipStrategies,
             Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics,
             SortedMap<Instant, ScalingSummary> history,
             Duration restartTime) {
@@ -121,6 +125,7 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         int newParallelism =
                 scale(
                         currentParallelism,
+                        inputShipStrategies,
                         (int) 
evaluatedMetrics.get(MAX_PARALLELISM).getCurrent(),
                         scaleFactor,
                         Math.min(currentParallelism, 
conf.getInteger(VERTEX_MIN_PARALLELISM)),
@@ -245,50 +250,68 @@ public class JobVertexScaler<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         }
     }
 
+    /**
+     * Computing the newParallelism. In general, newParallelism = 
currentParallelism * scaleFactor.
+     * But we limit newParallelism between parallelismLowerLimit and 
min(parallelismUpperLimit,
+     * maxParallelism).
+     *
+     * <p>Also, in order to ensure the data is evenly spread across subtasks, 
we try to adjust the
+     * parallelism for source and keyed vertex such that it divides the 
maxParallelism without a
+     * remainder.
+     */
     @VisibleForTesting
     protected static int scale(
-            int parallelism,
-            int numKeyGroups,
+            int currentParallelism,
+            Collection<ShipStrategy> inputShipStrategies,
+            int maxParallelism,
             double scaleFactor,
-            int minParallelism,
-            int maxParallelism) {
+            int parallelismLowerLimit,
+            int parallelismUpperLimit) {
         Preconditions.checkArgument(
-                minParallelism <= maxParallelism,
-                "The minimum parallelism must not be greater than the maximum 
parallelism.");
-        if (minParallelism > numKeyGroups) {
+                parallelismLowerLimit <= parallelismUpperLimit,
+                "The parallelism lower limitation must not be greater than the 
parallelism upper limitation.");
+        if (parallelismLowerLimit > maxParallelism) {
             LOG.warn(
                     "Specified autoscaler minimum parallelism {} is greater 
than the operator max parallelism {}. The min parallelism will be set to the 
operator max parallelism.",
-                    minParallelism,
-                    numKeyGroups);
+                    parallelismLowerLimit,
+                    maxParallelism);
         }
-        if (numKeyGroups < maxParallelism && maxParallelism != 
Integer.MAX_VALUE) {
+        if (maxParallelism < parallelismUpperLimit && parallelismUpperLimit != 
Integer.MAX_VALUE) {
             LOG.debug(
                     "Specified autoscaler maximum parallelism {} is greater 
than the operator max parallelism {}. This means the operator max parallelism 
can never be reached.",
-                    maxParallelism,
-                    numKeyGroups);
+                    parallelismUpperLimit,
+                    maxParallelism);
         }
 
         int newParallelism =
                 // Prevent integer overflow when converting from double to 
integer.
                 // We do not have to detect underflow because doubles cannot
                 // underflow.
-                (int) Math.min(Math.ceil(scaleFactor * parallelism), 
Integer.MAX_VALUE);
+                (int) Math.min(Math.ceil(scaleFactor * currentParallelism), 
Integer.MAX_VALUE);
 
-        // Cap parallelism at either number of key groups or parallelism limit
-        final int upperBound = Math.min(numKeyGroups, maxParallelism);
+        // Cap parallelism at either maxParallelism(number of key groups or 
source partitions) or
+        // parallelism upper limit
+        final int upperBound = Math.min(maxParallelism, parallelismUpperLimit);
 
         // Apply min/max parallelism
-        newParallelism = Math.min(Math.max(minParallelism, newParallelism), 
upperBound);
+        newParallelism = Math.min(Math.max(parallelismLowerLimit, 
newParallelism), upperBound);
+
+        var adjustByMaxParallelism =
+                inputShipStrategies.isEmpty() || 
inputShipStrategies.contains(HASH);
+        if (!adjustByMaxParallelism) {
+            return newParallelism;
+        }
 
-        // Try to adjust the parallelism such that it divides the number of 
key groups without a
-        // remainder => state is evenly spread across subtasks
-        for (int p = newParallelism; p <= numKeyGroups / 2 && p <= upperBound; 
p++) {
-            if (numKeyGroups % p == 0) {
+        // When the shuffle type of vertex inputs contains keyBy or vertex is 
a source, we try to
+        // adjust the parallelism such that it divides the maxParallelism 
without a remainder
+        // => data is evenly spread across subtasks
+        for (int p = newParallelism; p <= maxParallelism / 2 && p <= 
upperBound; p++) {
+            if (maxParallelism % p == 0) {
                 return p;
             }
         }
 
-        // If key group adjustment fails, use originally computed parallelism
+        // If parallelism adjustment fails, use originally computed parallelism
         return newParallelism;
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
index ef401ffe..0e714507 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/ScalingExecutor.java
@@ -102,7 +102,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
         var restartTime = scalingTracking.getMaxRestartTimeOrDefault(conf);
 
         var scalingSummaries =
-                computeScalingSummary(context, evaluatedMetrics, 
scalingHistory, restartTime);
+                computeScalingSummary(
+                        context, evaluatedMetrics, scalingHistory, 
restartTime, jobTopology);
 
         if (scalingSummaries.isEmpty()) {
             LOG.info("All job vertices are currently running at their target 
parallelism.");
@@ -203,7 +204,8 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
             Context context,
             EvaluatedMetrics evaluatedMetrics,
             Map<JobVertexID, SortedMap<Instant, ScalingSummary>> 
scalingHistory,
-            Duration restartTime) {
+            Duration restartTime,
+            JobTopology jobTopology) {
         LOG.debug("Restart time used in scaling summary computation: {}", 
restartTime);
 
         if (isJobUnderMemoryPressure(context, 
evaluatedMetrics.getGlobalMetrics())) {
@@ -225,10 +227,12 @@ public class ScalingExecutor<KEY, Context extends 
JobAutoScalerContext<KEY>> {
                             } else {
                                 var currentParallelism =
                                         (int) 
metrics.get(ScalingMetric.PARALLELISM).getCurrent();
+
                                 var newParallelism =
                                         
jobVertexScaler.computeScaleTargetParallelism(
                                                 context,
                                                 v,
+                                                
jobTopology.get(v).getInputs().values(),
                                                 metrics,
                                                 scalingHistory.getOrDefault(
                                                         v, 
Collections.emptySortedMap()),
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
index c140c423..8945882c 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/JobTopology.java
@@ -61,7 +61,7 @@ public class JobTopology {
 
     public JobTopology(Set<VertexInfo> vertexInfo) {
 
-        Map<JobVertexID, Map<JobVertexID, String>> vertexOutputs = new 
HashMap<>();
+        Map<JobVertexID, Map<JobVertexID, ShipStrategy>> vertexOutputs = new 
HashMap<>();
         vertexInfos =
                 ImmutableMap.copyOf(
                         
vertexInfo.stream().collect(Collectors.toMap(VertexInfo::getId, v -> v)));
@@ -145,7 +145,7 @@ public class JobTopology {
 
         for (JsonNode node : nodes) {
             var vertexId = JobVertexID.fromHexString(node.get("id").asText());
-            var inputs = new HashMap<JobVertexID, String>();
+            var inputs = new HashMap<JobVertexID, ShipStrategy>();
             var ioMetrics = metrics.get(vertexId);
             var finished = finishedVertices.contains(vertexId);
             vertexInfo.add(
@@ -160,7 +160,7 @@ public class JobTopology {
                 for (JsonNode input : node.get("inputs")) {
                     inputs.put(
                             
JobVertexID.fromHexString(input.get("id").asText()),
-                            input.get("ship_strategy").asText());
+                            
ShipStrategy.of(input.get("ship_strategy").asText()));
                 }
             }
         }
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/ShipStrategy.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/ShipStrategy.java
new file mode 100644
index 00000000..2f1e64b6
--- /dev/null
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/ShipStrategy.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.topology;
+
+import javax.annotation.Nonnull;
+
+/** The ship strategy between 2 JobVertices. */
+public enum ShipStrategy {
+    HASH,
+
+    REBALANCE,
+
+    RESCALE,
+
+    FORWARD,
+
+    CUSTOM,
+
+    BROADCAST,
+
+    GLOBAL,
+
+    SHUFFLE,
+
+    UNKNOWN;
+
+    /**
+     * Generates a ShipStrategy from a string, or returns {@link #UNKNOWN} if 
the value cannot match
+     * any ShipStrategy.
+     */
+    @Nonnull
+    public static ShipStrategy of(String value) {
+        for (ShipStrategy shipStrategy : ShipStrategy.values()) {
+            if (shipStrategy.toString().equalsIgnoreCase(value)) {
+                return shipStrategy;
+            }
+        }
+        return UNKNOWN;
+    }
+}
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
index 69e669a3..2428cdeb 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/topology/VertexInfo.java
@@ -31,10 +31,10 @@ public class VertexInfo {
     private final JobVertexID id;
 
     // All input vertices and the ship_strategy
-    private final Map<JobVertexID, String> inputs;
+    private final Map<JobVertexID, ShipStrategy> inputs;
 
     // All output vertices and the ship_strategy
-    private Map<JobVertexID, String> outputs;
+    private Map<JobVertexID, ShipStrategy> outputs;
 
     private final int parallelism;
 
@@ -48,7 +48,7 @@ public class VertexInfo {
 
     public VertexInfo(
             JobVertexID id,
-            Map<JobVertexID, String> inputs,
+            Map<JobVertexID, ShipStrategy> inputs,
             int parallelism,
             int maxParallelism,
             boolean finished,
@@ -65,7 +65,7 @@ public class VertexInfo {
     @VisibleForTesting
     public VertexInfo(
             JobVertexID id,
-            Map<JobVertexID, String> inputs,
+            Map<JobVertexID, ShipStrategy> inputs,
             int parallelism,
             int maxParallelism,
             IOMetrics ioMetrics) {
@@ -74,7 +74,10 @@ public class VertexInfo {
 
     @VisibleForTesting
     public VertexInfo(
-            JobVertexID id, Map<JobVertexID, String> inputs, int parallelism, 
int maxParallelism) {
+            JobVertexID id,
+            Map<JobVertexID, ShipStrategy> inputs,
+            int parallelism,
+            int maxParallelism) {
         this(id, inputs, parallelism, maxParallelism, null);
     }
 
diff --git 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
index 428f7a66..dd7eb759 100644
--- 
a/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
+++ 
b/flink-autoscaler/src/main/java/org/apache/flink/autoscaler/tuning/MemoryTuning.java
@@ -26,6 +26,7 @@ import org.apache.flink.autoscaler.metrics.EvaluatedMetrics;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
 import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
 import org.apache.flink.autoscaler.topology.VertexInfo;
 import org.apache.flink.autoscaler.utils.ResourceCheckUtils;
 import org.apache.flink.configuration.Configuration;
@@ -53,6 +54,8 @@ import java.util.Map;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.HEAP_MEMORY_USED;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.MANAGED_MEMORY_USED;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.METASPACE_MEMORY_USED;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.FORWARD;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.RESCALE;
 
 /** Tunes the TaskManager memory. */
 public class MemoryTuning {
@@ -254,9 +257,9 @@ public class MemoryTuning {
         long maxNetworkMemory = 0;
         for (VertexInfo vertexInfo : jobTopology.getVertexInfos().values()) {
             // Add max amount of memory for each input gate
-            for (Map.Entry<JobVertexID, String> inputEntry : 
vertexInfo.getInputs().entrySet()) {
-                final JobVertexID inputVertexId = inputEntry.getKey();
-                final String shipStrategy = inputEntry.getValue();
+            for (var inputEntry : vertexInfo.getInputs().entrySet()) {
+                var inputVertexId = inputEntry.getKey();
+                var shipStrategy = inputEntry.getValue();
                 maxNetworkMemory +=
                         calculateNetworkSegmentNumber(
                                         
updatedParallelisms.get(vertexInfo.getId()),
@@ -268,9 +271,9 @@ public class MemoryTuning {
             }
             // Add max amount of memory for each output gate
             // Usually, there is just one output per task
-            for (Map.Entry<JobVertexID, String> outputEntry : 
vertexInfo.getOutputs().entrySet()) {
-                final JobVertexID outputVertexId = outputEntry.getKey();
-                final String shipStrategy = outputEntry.getValue();
+            for (var outputEntry : vertexInfo.getOutputs().entrySet()) {
+                var outputVertexId = outputEntry.getKey();
+                var shipStrategy = outputEntry.getValue();
                 maxNetworkMemory +=
                         calculateNetworkSegmentNumber(
                                         
updatedParallelisms.get(vertexInfo.getId()),
@@ -300,15 +303,15 @@ public class MemoryTuning {
     static int calculateNetworkSegmentNumber(
             int currentVertexParallelism,
             int connectedVertexParallelism,
-            String shipStrategy,
+            ShipStrategy shipStrategy,
             int buffersPerChannel,
             int floatingBuffers) {
         // TODO When the parallelism is changed via the rescale api, the 
FORWARD may be changed to
         // RESCALE. This logic may needs to be updated after FLINK-33123.
         if (currentVertexParallelism == connectedVertexParallelism
-                && "FORWARD".equals(shipStrategy)) {
+                && FORWARD.equals(shipStrategy)) {
             return buffersPerChannel + floatingBuffers;
-        } else if ("FORWARD".equals(shipStrategy) || 
"RESCALE".equals(shipStrategy)) {
+        } else if (FORWARD.equals(shipStrategy) || 
RESCALE.equals(shipStrategy)) {
             final int channelCount =
                     (int) Math.ceil(connectedVertexParallelism / (double) 
currentVertexParallelism);
             return channelCount * buffersPerChannel + floatingBuffers;
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
index 10490e7b..3fde1640 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/BacklogBasedScalingTest.java
@@ -45,6 +45,7 @@ import java.util.SortedMap;
 
 import static org.apache.flink.autoscaler.JobAutoScalerImpl.AUTOSCALER_ERROR;
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -81,7 +82,7 @@ public class BacklogBasedScalingTest {
                                 new VertexInfo(source1, Map.of(), 1, 720, new 
IOMetrics(0, 0, 0)),
                                 new VertexInfo(
                                         sink,
-                                        Map.of(source1, "REBALANCE"),
+                                        Map.of(source1, REBALANCE),
                                         1,
                                         720,
                                         new IOMetrics(0, 0, 0))));
@@ -157,7 +158,7 @@ public class BacklogBasedScalingTest {
         metricsCollector.setJobTopology(
                 new JobTopology(
                         new VertexInfo(source1, Map.of(), 4, 24),
-                        new VertexInfo(sink, Map.of(source1, "REBALANCE"), 4, 
720)));
+                        new VertexInfo(sink, Map.of(source1, REBALANCE), 4, 
720)));
 
         metricsCollector.updateMetrics(
                 source1,
@@ -239,7 +240,7 @@ public class BacklogBasedScalingTest {
         metricsCollector.setJobTopology(
                 new JobTopology(
                         new VertexInfo(source1, Map.of(), 2, 24),
-                        new VertexInfo(sink, Map.of(source1, "REBALANCE"), 2, 
720)));
+                        new VertexInfo(sink, Map.of(source1, REBALANCE), 2, 
720)));
 
         /* Test stability while processing backlog. */
 
@@ -361,7 +362,7 @@ public class BacklogBasedScalingTest {
         metricsCollector.setJobTopology(
                 new JobTopology(
                         new VertexInfo(source1, Map.of(), 4, 720),
-                        new VertexInfo(sink, Map.of(source1, "REBALANCE"), 4, 
720)));
+                        new VertexInfo(sink, Map.of(source1, REBALANCE), 4, 
720)));
 
         var expectedEndTime = Instant.ofEpochMilli(10);
         metricsCollector.setJobUpdateTs(expectedEndTime);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
index 456105d2..e08ab9db 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/JobVertexScalerTest.java
@@ -23,18 +23,23 @@ import org.apache.flink.autoscaler.config.AutoScalerOptions;
 import org.apache.flink.autoscaler.event.TestingEventCollector;
 import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
 import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.time.Clock;
 import java.time.Duration;
 import java.time.Instant;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
@@ -49,12 +54,22 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Test for vertex parallelism scaler logic. */
 public class JobVertexScalerTest {
 
+    private static final Collection<ShipStrategy> NOT_ADJUST_INPUTS =
+            List.of(ShipStrategy.REBALANCE, ShipStrategy.RESCALE);
+
     private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
     private JobVertexScaler<JobID, JobAutoScalerContext<JobID>> vertexScaler;
     private JobAutoScalerContext<JobID> context;
     private Configuration conf;
     private Duration restartTime;
 
+    private static List<Collection<ShipStrategy>> adjustmentInputsProvider() {
+        return List.of(
+                List.of(),
+                List.of(ShipStrategy.HASH),
+                List.of(ShipStrategy.REBALANCE, ShipStrategy.HASH, 
ShipStrategy.RESCALE));
+    }
+
     @BeforeEach
     public void setup() {
         eventCollector = new TestingEventCollector<>();
@@ -75,8 +90,9 @@ public class JobVertexScalerTest {
         restartTime = conf.get(AutoScalerOptions.RESTART_TIME);
     }
 
-    @Test
-    public void testParallelismScaling() {
+    @ParameterizedTest
+    @MethodSource("adjustmentInputsProvider")
+    public void testParallelismScaling(Collection<ShipStrategy> 
inputShipStrategies) {
         var op = new JobVertexID();
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.);
 
@@ -85,6 +101,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 50, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -95,6 +112,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 50, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -105,6 +123,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 80, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -115,6 +134,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 60, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -124,6 +144,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 59, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -134,6 +155,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(2, 100, 40),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -144,6 +166,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(2, 100, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -155,6 +178,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 10, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -165,6 +189,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 10, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -176,6 +201,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 200, 10),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -186,6 +212,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         op,
+                        inputShipStrategies,
                         evaluated(10, 200, 10),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -195,26 +222,82 @@ public class JobVertexScalerTest {
     public void testParallelismComputation() {
         final int minParallelism = 1;
         final int maxParallelism = Integer.MAX_VALUE;
-        assertEquals(1, JobVertexScaler.scale(1, 720, 0.0001, minParallelism, 
maxParallelism));
-        assertEquals(1, JobVertexScaler.scale(2, 720, 0.1, minParallelism, 
maxParallelism));
-        assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, minParallelism, 
maxParallelism));
-        assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, minParallelism, 
maxParallelism));
-        assertEquals(400, JobVertexScaler.scale(200, 720, 2, minParallelism, 
maxParallelism));
+        assertEquals(
+                1,
+                JobVertexScaler.scale(
+                        1, NOT_ADJUST_INPUTS, 720, 0.0001, minParallelism, 
maxParallelism));
+        assertEquals(
+                1,
+                JobVertexScaler.scale(
+                        2, NOT_ADJUST_INPUTS, 720, 0.1, minParallelism, 
maxParallelism));
+        assertEquals(
+                5,
+                JobVertexScaler.scale(
+                        6, NOT_ADJUST_INPUTS, 720, 0.8, minParallelism, 
maxParallelism));
+        assertEquals(
+                24,
+                JobVertexScaler.scale(
+                        16, NOT_ADJUST_INPUTS, 128, 1.5, minParallelism, 
maxParallelism));
+        assertEquals(
+                400,
+                JobVertexScaler.scale(
+                        200, NOT_ADJUST_INPUTS, 720, 2, minParallelism, 
maxParallelism));
         assertEquals(
                 720,
-                JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 
minParallelism, maxParallelism));
+                JobVertexScaler.scale(
+                        200,
+                        NOT_ADJUST_INPUTS,
+                        720,
+                        Integer.MAX_VALUE,
+                        minParallelism,
+                        maxParallelism));
     }
 
-    @Test
-    public void testParallelismComputationWithLimit() {
-        assertEquals(5, JobVertexScaler.scale(6, 720, 0.8, 1, 700));
-        assertEquals(8, JobVertexScaler.scale(8, 720, 0.8, 8, 700));
+    @ParameterizedTest
+    @MethodSource("adjustmentInputsProvider")
+    public void testParallelismComputationWithAdjustment(
+            Collection<ShipStrategy> inputShipStrategies) {
+        final int minParallelism = 1;
+        final int maxParallelism = Integer.MAX_VALUE;
+        assertEquals(
+                6,
+                JobVertexScaler.scale(
+                        6, inputShipStrategies, 36, 0.8, minParallelism, 
maxParallelism));
+        assertEquals(
+                32,
+                JobVertexScaler.scale(
+                        16, inputShipStrategies, 128, 1.5, minParallelism, 
maxParallelism));
+        assertEquals(
+                360,
+                JobVertexScaler.scale(
+                        200, inputShipStrategies, 720, 1.3, minParallelism, 
maxParallelism));
+        assertEquals(
+                720,
+                JobVertexScaler.scale(
+                        200,
+                        inputShipStrategies,
+                        720,
+                        Integer.MAX_VALUE,
+                        minParallelism,
+                        maxParallelism));
+    }
 
-        assertEquals(32, JobVertexScaler.scale(16, 128, 1.5, 1, 
Integer.MAX_VALUE));
-        assertEquals(64, JobVertexScaler.scale(16, 128, 1.5, 60, 
Integer.MAX_VALUE));
+    @ParameterizedTest
+    @MethodSource("adjustmentInputsProvider")
+    public void testParallelismComputationWithLimit(Collection<ShipStrategy> 
inputShipStrategies) {
+        assertEquals(5, JobVertexScaler.scale(6, inputShipStrategies, 720, 
0.8, 1, 700));
+        assertEquals(8, JobVertexScaler.scale(8, inputShipStrategies, 720, 
0.8, 8, 700));
 
-        assertEquals(300, JobVertexScaler.scale(200, 720, 2, 1, 300));
-        assertEquals(600, JobVertexScaler.scale(200, 720, Integer.MAX_VALUE, 
1, 600));
+        assertEquals(
+                32, JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 
1, Integer.MAX_VALUE));
+        assertEquals(
+                64,
+                JobVertexScaler.scale(16, inputShipStrategies, 128, 1.5, 60, 
Integer.MAX_VALUE));
+
+        assertEquals(300, JobVertexScaler.scale(200, inputShipStrategies, 720, 
2, 1, 300));
+        assertEquals(
+                600,
+                JobVertexScaler.scale(200, inputShipStrategies, 720, 
Integer.MAX_VALUE, 1, 600));
     }
 
     @Test
@@ -225,7 +308,12 @@ public class JobVertexScalerTest {
                                 assertEquals(
                                         600,
                                         JobVertexScaler.scale(
-                                                200, 720, Integer.MAX_VALUE, 
500, 499)));
+                                                200,
+                                                NOT_ADJUST_INPUTS,
+                                                720,
+                                                Integer.MAX_VALUE,
+                                                500,
+                                                499)));
     }
 
     @Test
@@ -236,6 +324,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
+                        NOT_ADJUST_INPUTS,
                         evaluated(10, 100, 500),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -246,6 +335,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
+                        NOT_ADJUST_INPUTS,
                         evaluated(4, 100, 500),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -260,6 +350,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
+                        NOT_ADJUST_INPUTS,
                         evaluated(10, 500, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -270,6 +361,7 @@ public class JobVertexScalerTest {
                 vertexScaler.computeScaleTargetParallelism(
                         context,
                         new JobVertexID(),
+                        NOT_ADJUST_INPUTS,
                         evaluated(12, 500, 100),
                         Collections.emptySortedMap(),
                         restartTime));
@@ -288,7 +380,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
 
         history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
 
@@ -297,7 +389,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
 
         // Pass some time...
         clock = Clock.offset(Clock.systemDefaultZone(), 
Duration.ofSeconds(61));
@@ -306,7 +398,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 5,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         history.put(clock.instant(), new ScalingSummary(10, 5, evaluated));
 
         // Allow immediate scale up
@@ -314,7 +406,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         history.put(clock.instant(), new ScalingSummary(5, 10, evaluated));
     }
 
@@ -330,7 +422,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         assertEquals(100, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
 
@@ -339,7 +431,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         assertEquals(180, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
 
@@ -349,7 +441,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Still considered ineffective (less than <10%)
@@ -357,7 +449,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Allow scale up if current parallelism doesnt match last (user 
rescaled manually)
@@ -365,14 +457,14 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
 
         // Over 10%, effective
         evaluated = evaluated(20, 180, 100);
         assertEquals(
                 36,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
 
         // Ineffective but detection is turned off
@@ -381,7 +473,7 @@ public class JobVertexScalerTest {
         assertEquals(
                 40,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
 
@@ -390,12 +482,13 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, op, evaluated, history, restartTime));
+                        context, op, NOT_ADJUST_INPUTS, evaluated, history, 
restartTime));
         
assertTrue(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
     }
 
-    @Test
-    public void testSendingIneffectiveScalingEvents() {
+    @ParameterizedTest
+    @MethodSource("adjustmentInputsProvider")
+    public void testSendingIneffectiveScalingEvents(Collection<ShipStrategy> 
inputShipStrategies) {
         var jobVertexID = new JobVertexID();
         conf.set(AutoScalerOptions.SCALING_EFFECTIVENESS_DETECTION_ENABLED, 
true);
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.0);
@@ -406,7 +499,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 10,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         assertEquals(100, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(5, 10, evaluated));
 
@@ -415,7 +513,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         assertEquals(180, 
evaluated.get(ScalingMetric.EXPECTED_PROCESSING_RATE).getCurrent());
         history.put(Instant.now(), new ScalingSummary(10, 20, evaluated));
         assertEquals(0, eventCollector.events.size());
@@ -425,7 +528,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         assertEquals(1, eventCollector.events.size());
         var event = eventCollector.events.poll();
@@ -441,7 +549,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         assertEquals(0, eventCollector.events.size());
 
@@ -450,7 +563,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         assertEquals(0, eventCollector.events.size());
 
@@ -459,7 +577,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 20,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         
assertFalse(evaluated.containsKey(ScalingMetric.EXPECTED_PROCESSING_RATE));
         assertEquals(1, eventCollector.events.size());
         event = eventCollector.events.poll();
@@ -476,7 +599,12 @@ public class JobVertexScalerTest {
         assertEquals(
                 40,
                 vertexScaler.computeScaleTargetParallelism(
-                        context, jobVertexID, evaluated, history, 
restartTime));
+                        context,
+                        jobVertexID,
+                        inputShipStrategies,
+                        evaluated,
+                        history,
+                        restartTime));
         assertEquals(1, eventCollector.events.size());
         event = eventCollector.events.poll();
         assertThat(event).isNotNull();
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
index d0b0daab..97631b05 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/MetricsCollectionAndEvaluationTest.java
@@ -50,6 +50,7 @@ import java.util.Map;
 import java.util.function.Supplier;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -92,12 +93,12 @@ public class MetricsCollectionAndEvaluationTest {
                         new VertexInfo(source2, Map.of(), 2, 720, new 
IOMetrics(0, 0, 0)),
                         new VertexInfo(
                                 map,
-                                Map.of(source1, "REBALANCE", source2, 
"REBALANCE"),
+                                Map.of(source1, REBALANCE, source2, REBALANCE),
                                 12,
                                 720,
                                 new IOMetrics(0, 0, 0)),
                         new VertexInfo(
-                                sink, Map.of(map, "REBALANCE"), 8, 24, new 
IOMetrics(0, 0, 0)));
+                                sink, Map.of(map, REBALANCE), 8, 24, new 
IOMetrics(0, 0, 0)));
 
         metricsCollector = new TestingMetricsCollector<>(topology);
 
@@ -183,7 +184,7 @@ public class MetricsCollectionAndEvaluationTest {
                 new HashMap<>(),
                 new ScalingTracking(),
                 clock.instant(),
-                new JobTopology());
+                topology);
 
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(4, scaledParallelism.size());
@@ -401,7 +402,7 @@ public class MetricsCollectionAndEvaluationTest {
                 new HashMap<>(),
                 new ScalingTracking(),
                 clock.instant(),
-                new JobTopology());
+                topology);
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
     }
@@ -652,7 +653,7 @@ public class MetricsCollectionAndEvaluationTest {
                 new HashMap<>(),
                 new ScalingTracking(),
                 clock.instant(),
-                new JobTopology());
+                topology);
         var scaledParallelism = 
ScalingExecutorTest.getScaledParallelism(stateStore, context);
         assertEquals(1, scaledParallelism.get(source1));
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
index 9b225ba6..197cb94c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/RecommendedParallelismTest.java
@@ -43,6 +43,7 @@ import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJo
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.getRestClusterClientSupplier;
 import static org.apache.flink.autoscaler.metrics.ScalingMetric.PARALLELISM;
 import static 
org.apache.flink.autoscaler.metrics.ScalingMetric.RECOMMENDED_PARALLELISM;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertNull;
@@ -75,7 +76,7 @@ public class RecommendedParallelismTest {
                 new TestingMetricsCollector<>(
                         new JobTopology(
                                 new VertexInfo(source, Map.of(), 1, 720),
-                                new VertexInfo(sink, Map.of(source, 
"REBALANCE"), 1, 720)));
+                                new VertexInfo(sink, Map.of(source, 
REBALANCE), 1, 720)));
 
         var defaultConf = context.getConfiguration();
         defaultConf.set(AutoScalerOptions.AUTOSCALER_ENABLED, true);
@@ -199,7 +200,7 @@ public class RecommendedParallelismTest {
         metricsCollector.setJobTopology(
                 new JobTopology(
                         new VertexInfo(source, Map.of(), 4, 24),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 4, 
720)));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 4, 
720)));
 
         now = now.plus(Duration.ofSeconds(10));
         setClocksTo(now);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
index 46b32424..764e312f 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingExecutorTest.java
@@ -52,6 +52,8 @@ import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_R
 import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_ENTRY;
 import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_DISABLED;
 import static 
org.apache.flink.autoscaler.event.AutoScalerEventHandler.SCALING_SUMMARY_HEADER_SCALING_EXECUTION_ENABLED;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -60,6 +62,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 /** Test for {@link ScalingExecutor}. */
 public class ScalingExecutorTest {
 
+    private static final int MAX_PARALLELISM = 720;
+
     private JobAutoScalerContext<JobID> context;
     private TestingEventCollector<JobID, JobAutoScalerContext<JobID>> 
eventCollector;
     private ScalingExecutor<JobID, JobAutoScalerContext<JobID>> 
scalingDecisionExecutor;
@@ -151,6 +155,12 @@ public class ScalingExecutorTest {
         var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
         var sink = JobVertexID.fromHexString(sinkHexString);
 
+        JobTopology jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Map.of(), 10, 1000, false, 
null),
+                        new VertexInfo(filterOperator, Map.of(source, HASH), 
10, 1000, false, null),
+                        new VertexInfo(sink, Map.of(filterOperator, HASH), 10, 
1000, false, null));
+
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.TARGET_UTILIZATION, .8);
         var metrics =
@@ -173,7 +183,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
         // filter operator should scale
         conf.set(AutoScalerOptions.VERTEX_EXCLUDE_IDS, List.of());
         assertTrue(
@@ -183,7 +193,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
     }
 
     @Test
@@ -192,6 +202,12 @@ public class ScalingExecutorTest {
         var source = JobVertexID.fromHexString(sourceHexString);
         var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
         var sink = JobVertexID.fromHexString(sinkHexString);
+
+        JobTopology jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Map.of(), 10, 1000, false, 
null),
+                        new VertexInfo(sink, Map.of(source, HASH), 10, 1000, 
false, null));
+
         var conf = context.getConfiguration();
         var now = Instant.now();
         var localTime = ZonedDateTime.ofInstant(now, 
ZoneId.systemDefault()).toLocalTime();
@@ -213,7 +229,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
         // scaling execution outside excluded periods
         excludedPeriod =
                 new 
StringBuilder(localTime.plusSeconds(100).toString().split("\\.")[0])
@@ -228,7 +244,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
     }
 
     @Test
@@ -237,6 +253,12 @@ public class ScalingExecutorTest {
         var source = JobVertexID.fromHexString(sourceHexString);
         var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
         var sink = JobVertexID.fromHexString(sinkHexString);
+
+        JobTopology jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Map.of(), 10, 1000, false, 
null),
+                        new VertexInfo(sink, Map.of(source, HASH), 10, 1000, 
false, null));
+
         var now = Instant.now();
         var metrics =
                 new EvaluatedMetrics(
@@ -257,7 +279,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
 
         scalingDecisionExecutor =
                 new ScalingExecutor<>(
@@ -282,7 +304,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
     }
 
     @Test
@@ -318,7 +340,7 @@ public class ScalingExecutorTest {
         JobTopology jobTopology =
                 new JobTopology(
                         new VertexInfo(source, Map.of(), 10, 1000, false, 
null),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 10, 
1000, false, null));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 10, 
1000, false, null));
 
         assertTrue(
                 scalingDecisionExecutor.scaleResource(
@@ -334,9 +356,9 @@ public class ScalingExecutorTest {
                                 
TaskManagerOptions.MANAGED_MEMORY_FRACTION.key(),
                                 "0.652",
                                 TaskManagerOptions.NETWORK_MEMORY_MIN.key(),
-                                "25 mb",
+                                "24320 kb",
                                 TaskManagerOptions.NETWORK_MEMORY_MAX.key(),
-                                "25 mb",
+                                "24320 kb",
                                 TaskManagerOptions.JVM_METASPACE.key(),
                                 "360 mb",
                                 TaskManagerOptions.JVM_OVERHEAD_FRACTION.key(),
@@ -344,7 +366,7 @@ public class ScalingExecutorTest {
                                 TaskManagerOptions.FRAMEWORK_HEAP_MEMORY.key(),
                                 "0 bytes",
                                 TaskManagerOptions.TOTAL_PROCESS_MEMORY.key(),
-                                "7681 mb"));
+                                "7864064 kb"));
     }
 
     @ParameterizedTest
@@ -368,6 +390,10 @@ public class ScalingExecutorTest {
 
     private void testScalingEvents(boolean scalingEnabled, Duration interval) 
throws Exception {
         var jobVertexID = new JobVertexID();
+
+        JobTopology jobTopology =
+                new JobTopology(new VertexInfo(jobVertexID, Map.of(), 10, 
1000, false, null));
+
         var conf = context.getConfiguration();
         conf.set(AutoScalerOptions.SCALING_ENABLED, scalingEnabled);
         if (interval != null) {
@@ -386,7 +412,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
         assertEquals(
                 scalingEnabled,
                 scalingDecisionExecutor.scaleResource(
@@ -395,7 +421,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
         int expectedSize = (interval == null || interval.toMillis() > 0) && 
!scalingEnabled ? 1 : 2;
         assertEquals(expectedSize, eventCollector.events.size());
 
@@ -434,7 +460,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         now,
-                        new JobTopology()));
+                        jobTopology));
         var event2 = eventCollector.events.poll();
         assertThat(event2).isNotNull();
         assertThat(event2.getContext()).isSameAs(event.getContext());
@@ -450,6 +476,8 @@ public class ScalingExecutorTest {
         conf.set(AutoScalerOptions.HEAP_USAGE_THRESHOLD, 0.8);
 
         var vertexMetrics = Map.of(jobVertexID, evaluated(1, 110, 100));
+        JobTopology jobTopology =
+                new JobTopology(new VertexInfo(jobVertexID, Map.of(), 10, 
1000, false, null));
         var metrics =
                 new EvaluatedMetrics(
                         vertexMetrics,
@@ -467,7 +495,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        new JobTopology()));
+                        jobTopology));
 
         // Just below the thresholds
         metrics =
@@ -485,7 +513,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        new JobTopology()));
+                        jobTopology));
 
         eventCollector.events.clear();
 
@@ -505,7 +533,7 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        new JobTopology()));
+                        jobTopology));
         assertEquals("MemoryPressure", 
eventCollector.events.poll().getReason());
         assertTrue(eventCollector.events.isEmpty());
 
@@ -525,16 +553,82 @@ public class ScalingExecutorTest {
                         new HashMap<>(),
                         new ScalingTracking(),
                         Instant.now(),
-                        new JobTopology()));
+                        jobTopology));
         assertEquals("MemoryPressure", 
eventCollector.events.poll().getReason());
         assertTrue(eventCollector.events.isEmpty());
     }
 
+    @Test
+    public void testAdjustByMaxParallelism() throws Exception {
+        var sourceHexString = "0bfd135746ac8efb3cce668b12e16d3a";
+        var source = JobVertexID.fromHexString(sourceHexString);
+        var filterOperatorHexString = "869fb403873411306404e9f2e4438c0e";
+        var filterOperator = 
JobVertexID.fromHexString(filterOperatorHexString);
+        var sinkHexString = "a6b7102b8d3e3a9564998c1ffeb5e2b7";
+        var sink = JobVertexID.fromHexString(sinkHexString);
+
+        JobTopology jobTopology =
+                new JobTopology(
+                        new VertexInfo(source, Map.of(), 2, MAX_PARALLELISM, 
false, null),
+                        new VertexInfo(
+                                filterOperator,
+                                Map.of(source, REBALANCE),
+                                2,
+                                MAX_PARALLELISM,
+                                false,
+                                null),
+                        new VertexInfo(
+                                sink,
+                                Map.of(filterOperator, HASH),
+                                2,
+                                MAX_PARALLELISM,
+                                false,
+                                null));
+
+        var conf = context.getConfiguration();
+        conf.set(AutoScalerOptions.TARGET_UTILIZATION, 1.d);
+
+        // The expected new parallelism is 7 without adjustment by max 
parallelism.
+        var metrics =
+                new EvaluatedMetrics(
+                        Map.of(
+                                source,
+                                evaluated(2, 70, 20),
+                                filterOperator,
+                                evaluated(2, 70, 20),
+                                sink,
+                                evaluated(2, 70, 20)),
+                        dummyGlobalMetrics);
+        var now = Instant.now();
+        assertThat(
+                        scalingDecisionExecutor.scaleResource(
+                                context,
+                                metrics,
+                                new HashMap<>(),
+                                new ScalingTracking(),
+                                now,
+                                jobTopology))
+                .isTrue();
+
+        Map<String, String> parallelismOverrides = 
stateStore.getParallelismOverrides(context);
+        // The source and keyed Operator should enable the parallelism 
adjustment, so the
+        // parallelism of source and sink are adjusted, but filter is not.
+        assertThat(parallelismOverrides)
+                .containsAllEntriesOf(
+                        Map.of(
+                                "0bfd135746ac8efb3cce668b12e16d3a",
+                                "8",
+                                "869fb403873411306404e9f2e4438c0e",
+                                "7",
+                                "a6b7102b8d3e3a9564998c1ffeb5e2b7",
+                                "8"));
+    }
+
     private Map<ScalingMetric, EvaluatedScalingMetric> evaluated(
             int parallelism, double target, double procRate, double 
catchupRate) {
         var metrics = new HashMap<ScalingMetric, EvaluatedScalingMetric>();
         metrics.put(ScalingMetric.PARALLELISM, 
EvaluatedScalingMetric.of(parallelism));
-        metrics.put(ScalingMetric.MAX_PARALLELISM, 
EvaluatedScalingMetric.of(720));
+        metrics.put(ScalingMetric.MAX_PARALLELISM, 
EvaluatedScalingMetric.of(MAX_PARALLELISM));
         metrics.put(ScalingMetric.TARGET_DATA_RATE, new 
EvaluatedScalingMetric(target, target));
         metrics.put(ScalingMetric.CATCH_UP_DATA_RATE, 
EvaluatedScalingMetric.of(catchupRate));
         metrics.put(
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
index 6157e487..7bc1190c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricCollectorTest.java
@@ -53,6 +53,8 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 
 import static 
org.apache.flink.autoscaler.TestingAutoscalerUtils.createDefaultJobAutoScalerContext;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -220,12 +222,7 @@ public class ScalingMetricCollectorTest {
         var source = new VertexInfo(sourceId, Map.of(), 2, 128, true, 
IOMetrics.FINISHED_METRICS);
         var sink =
                 new VertexInfo(
-                        sinkId,
-                        Map.of(sourceId, "HASH"),
-                        2,
-                        128,
-                        false,
-                        new IOMetrics(291532, 1, 2));
+                        sinkId, Map.of(sourceId, HASH), 2, 128, false, new 
IOMetrics(291532, 1, 2));
 
         assertEquals(
                 new JobTopology(source, sink), 
metricsCollector.getJobTopology(jobDetailsInfo));
@@ -275,12 +272,12 @@ public class ScalingMetricCollectorTest {
         var t1 =
                 new JobTopology(
                         new VertexInfo(source, Map.of(), 1, 1),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 1, 
1));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1));
 
         var t2 =
                 new JobTopology(
                         new VertexInfo(source2, Map.of(), 1, 1),
-                        new VertexInfo(sink, Map.of(source2, "REBALANCE"), 1, 
1));
+                        new VertexInfo(sink, Map.of(source2, REBALANCE), 1, 
1));
 
         collector.queryFilteredMetricNames(context, t1);
         assertEquals(1, metricNameQueryCounter.get(source));
@@ -309,7 +306,7 @@ public class ScalingMetricCollectorTest {
         t2 =
                 new JobTopology(
                         new VertexInfo(source2, Map.of(), 1, 1, true, null),
-                        new VertexInfo(sink, Map.of(source2, "REBALANCE"), 1, 
1));
+                        new VertexInfo(sink, Map.of(source2, REBALANCE), 1, 
1));
         collector.queryFilteredMetricNames(context, t2);
         assertEquals(3, metricNameQueryCounter.get(source));
         assertEquals(2, metricNameQueryCounter.get(source2));
@@ -333,7 +330,7 @@ public class ScalingMetricCollectorTest {
         var topology =
                 new JobTopology(
                         new VertexInfo(source, Map.of(), 1, 1),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 1, 
1));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1));
 
         testRequiredMetrics(
                 metricList, getSourceRequiredMetrics(), testCollector, source, 
topology);
@@ -410,7 +407,7 @@ public class ScalingMetricCollectorTest {
         var topology =
                 new JobTopology(
                         new VertexInfo(source, Map.of(), 1, 1),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 1, 
1));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1));
 
         var metricCollector = new TestingMetricsCollector<>(topology);
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
index c4559f14..969d70ee 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/ScalingMetricEvaluatorTest.java
@@ -43,6 +43,7 @@ import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.PREFER_TRACKE
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.RESTART_TIME;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION;
 import static 
org.apache.flink.autoscaler.config.AutoScalerOptions.TARGET_UTILIZATION_BOUNDARY;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -60,7 +61,7 @@ public class ScalingMetricEvaluatorTest {
         var topology =
                 new JobTopology(
                         new VertexInfo(source, Collections.emptyMap(), 1, 1, 
null),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 1, 
1, null));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1, 
null));
 
         var metricHistory = new TreeMap<Instant, CollectedMetrics>();
 
@@ -324,7 +325,7 @@ public class ScalingMetricEvaluatorTest {
         var topology =
                 new JobTopology(
                         new VertexInfo(source, Collections.emptyMap(), 1, 1),
-                        new VertexInfo(sink, Map.of(source, "REBALANCE"), 1, 
1));
+                        new VertexInfo(sink, Map.of(source, REBALANCE), 1, 1));
 
         var metricHistory = new TreeMap<Instant, CollectedMetrics>();
 
@@ -661,9 +662,8 @@ public class ScalingMetricEvaluatorTest {
                 new JobTopology(
                         new VertexInfo(source1, Collections.emptyMap(), 1, 1),
                         new VertexInfo(source2, Collections.emptyMap(), 1, 1),
-                        new VertexInfo(
-                                op1, Map.of(source1, "REBALANCE", source2, 
"REBALANCE"), 1, 1),
-                        new VertexInfo(sink1, Map.of(op1, "REBALANCE"), 1, 1));
+                        new VertexInfo(op1, Map.of(source1, REBALANCE, 
source2, REBALANCE), 1, 1),
+                        new VertexInfo(sink1, Map.of(op1, REBALANCE), 1, 1));
 
         var metricHistory = new TreeMap<Instant, CollectedMetrics>();
 
@@ -728,10 +728,8 @@ public class ScalingMetricEvaluatorTest {
                 new JobTopology(
                         new VertexInfo(source1, Collections.emptyMap(), 1, 1),
                         new VertexInfo(source2, Collections.emptyMap(), 1, 1),
-                        new VertexInfo(
-                                op1, Map.of(source1, "REBALANCE", source2, 
"REBALANCE"), 1, 1),
-                        new VertexInfo(
-                                op2, Map.of(source1, "REBALANCE", source2, 
"REBALANCE"), 1, 1));
+                        new VertexInfo(op1, Map.of(source1, REBALANCE, 
source2, REBALANCE), 1, 1),
+                        new VertexInfo(op2, Map.of(source1, REBALANCE, 
source2, REBALANCE), 1, 1));
 
         var metricHistory = new TreeMap<Instant, CollectedMetrics>();
 
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
index d858a1e9..4b417ad4 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/metrics/ScalingMetricsTest.java
@@ -34,6 +34,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -53,7 +54,7 @@ public class ScalingMetricsTest {
                         new VertexInfo(
                                 source, Collections.emptyMap(), 1, 1, new 
IOMetrics(1, 2, 3)),
                         new VertexInfo(
-                                op, Map.of(source, "REBALANCE"), 1, 1, new 
IOMetrics(1, 2, 3)));
+                                op, Map.of(source, REBALANCE), 1, 1, new 
IOMetrics(1, 2, 3)));
 
         Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
         ScalingMetrics.computeDataRateMetrics(
@@ -223,7 +224,7 @@ public class ScalingMetricsTest {
                         new VertexInfo(
                                 SOURCE, Collections.emptyMap(), 1, 1, new 
IOMetrics(0, 0, 0)),
                         new VertexInfo(
-                                sink, Map.of(SOURCE, "REBALANCE"), 1, 1, new 
IOMetrics(0, 0, 0)));
+                                sink, Map.of(SOURCE, REBALANCE), 1, 1, new 
IOMetrics(0, 0, 0)));
 
         Map<ScalingMetric, Double> scalingMetrics = new HashMap<>();
         scalingMetrics.put(ScalingMetric.LAG, lag);
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
index e38295e1..f857361a 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/topology/JobTopologyTest.java
@@ -31,6 +31,10 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.autoscaler.topology.ShipStrategy.FORWARD;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.HASH;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.RESCALE;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -82,25 +86,25 @@ public class JobTopologyTest {
         assertTrue(jobTopology.get(vertices.get("Sink: 
sink3")).getOutputs().isEmpty());
 
         assertEquals(
-                Map.of(vertices.get("map1"), "HASH"),
+                Map.of(vertices.get("map1"), HASH),
                 jobTopology.get(vertices.get("Source: s1")).getOutputs());
         assertEquals(
-                Map.of(vertices.get("map1"), "HASH"),
+                Map.of(vertices.get("map1"), HASH),
                 jobTopology.get(vertices.get("Source: s2")).getOutputs());
         assertEquals(
-                Map.of(vertices.get("Sink: sink1"), "FORWARD"),
+                Map.of(vertices.get("Sink: sink1"), FORWARD),
                 jobTopology.get(vertices.get("map1")).getOutputs());
 
         assertEquals(
-                Map.of(vertices.get("map2"), "RESCALE"),
+                Map.of(vertices.get("map2"), RESCALE),
                 jobTopology.get(vertices.get("Source: s3")).getOutputs());
 
         assertEquals(
                 Map.of(
                         vertices.get("Sink: sink2"),
-                        "REBALANCE",
+                        REBALANCE,
                         vertices.get("Sink: sink3"),
-                        "REBALANCE"),
+                        REBALANCE),
                 jobTopology.get(vertices.get("map2")).getOutputs());
 
         assertEquals(2, 
jobTopology.get(vertices.get("map1")).getParallelism());
diff --git 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
index b3c99716..77b0e19c 100644
--- 
a/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
+++ 
b/flink-autoscaler/src/test/java/org/apache/flink/autoscaler/tuning/MemoryTuningTest.java
@@ -38,6 +38,10 @@ import java.time.Duration;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.flink.autoscaler.topology.ShipStrategy.FORWARD;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.REBALANCE;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.RESCALE;
+import static org.apache.flink.autoscaler.topology.ShipStrategy.UNKNOWN;
 import static org.assertj.core.api.AssertionsForInterfaceTypes.assertThat;
 
 /** Tests for {@link MemoryTuning}. */
@@ -86,12 +90,7 @@ public class MemoryTuningTest {
                 new JobTopology(
                         new VertexInfo(jobVertex1, Map.of(), 50, 1000, false, 
null),
                         new VertexInfo(
-                                jobVertex2,
-                                Map.of(jobVertex1, "REBALANCE"),
-                                50,
-                                1000,
-                                false,
-                                null));
+                                jobVertex2, Map.of(jobVertex1, REBALANCE), 50, 
1000, false, null));
 
         Map<JobVertexID, ScalingSummary> scalingSummaries =
                 Map.of(
@@ -202,23 +201,19 @@ public class MemoryTuningTest {
     @Test
     void testCalculateNetworkSegmentNumber() {
         // Test FORWARD
-        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 10, 
"FORWARD", 2, 8))
-                .isEqualTo(10);
+        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 10, FORWARD, 
2, 8)).isEqualTo(10);
 
         // Test FORWARD is changed to RESCALE.
-        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, 
"FORWARD", 2, 8))
-                .isEqualTo(12);
+        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, FORWARD, 
2, 8)).isEqualTo(12);
 
         // Test RESCALE.
-        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, 
"RESCALE", 2, 8))
-                .isEqualTo(12);
+        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, RESCALE, 
2, 8)).isEqualTo(12);
 
         // Test REBALANCE.
-        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, 
"REBALANCE", 2, 8))
+        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, 
REBALANCE, 2, 8))
                 .isEqualTo(38);
 
-        // Test Unrecognizable.
-        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, 
"Unrecognizable", 2, 8))
-                .isEqualTo(38);
+        // Test UNKNOWN.
+        assertThat(MemoryTuning.calculateNetworkSegmentNumber(10, 15, UNKNOWN, 
2, 8)).isEqualTo(38);
     }
 }

Reply via email to