gyfora commented on code in PR #1088:
URL: 
https://github.com/apache/flink-kubernetes-operator/pull/1088#discussion_r3472737314


##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -375,17 +376,65 @@ private static ConfigOptions.OptionBuilder 
autoScalerConfig(String key) {
                     .withDescription(
                             "Quota of the CPU count. When scaling would go 
beyond this number the the scaling is not going to happen.");
 
-    public static final 
ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
+    @Deprecated
+    public static final ConfigOption<KeyGroupOrPartitionsAdjustMode>
             SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
                     
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
-                            
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
-                            .defaultValue(
-                                    
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
+                            .enumType(KeyGroupOrPartitionsAdjustMode.class)
+                            
.defaultValue(KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
                             .withFallbackKeys(
                                     oldOperatorConfigKey(
                                             
"scaling.key-group.partitions.adjust.mode"))
                             .withDescription(
-                                    "How to adjust the parallelism of Source 
vertex or upstream shuffle is keyBy");
+                                    "Deprecated, use scaling.alignment.mode 
instead. How to adjust "
+                                            + "the parallelism of Source 
vertex or upstream partitioning is keyBy");
+
+    public static final ConfigOption<String> ALIGNMENT_MODE =
+            autoScalerConfig("scaling.alignment.mode")

Review Comment:
   lets call this everywhere `scaling.parallelism-alignment.mode`



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's 
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current 
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ *      current │── within-range ──│ target │── above-target ──│ 
upperAlignLimit
+ * scale-down:
+ *      lowerAlignLimit │── below-target ──│ target │── within-range ──│ 
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is 
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom 
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's 
{@code PluginManager}
+ * in the operator) and selected by name through {@code 
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it, 
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
+
+    /**
+     * Whether this mode applies to the vertex described by {@code ctx}. When 
it returns {@code
+     * false} the autoscaler keeps the computed target parallelism and {@link 
#align(Context)} is
+     * not called. Defaults to keyBy (hash) vertices and to partitioned 
sources that report a
+     * partition count (Kafka and Pulsar do by default). A custom mode can 
widen this, for example
+     * to align custom partitioned vertices.
+     */
+    default boolean isApplicable(Context ctx) {
+        return ctx.getNumSourcePartitions() > 0
+                || ctx.getInputShipStrategies().contains(ShipStrategy.HASH);
+    }
+
+    /**
+     * Returns the parallelism to apply for the vertex described by {@code 
ctx}. Called only when
+     * {@link #isApplicable(Context)} returned {@code true}.
+     */
+    int align(Context ctx);
+
+    /**
+     * Immutable inputs to a single per-vertex parallelism alignment, handed 
to {@link
+     * AlignmentMode#align(Context)}. Besides the parallelism inputs it 
exposes the full autoscaler
+     * context, the per-vertex evaluated metrics, the job topology, and a 
prefix-stripped
+     * configuration for the selected mode, so custom modes have what they 
need.
+     */
+    @Value
+    class Context {
+        /** The vertex being aligned. */
+        JobVertexID vertex;
+
+        /** The current parallelism of the vertex. */
+        int currentParallelism;
+
+        /** The clamped computed target parallelism, before alignment. */
+        int newParallelism;
+
+        /** The number of source partitions, or a non-positive value when not 
a source. */
+        int numSourcePartitions;
+
+        /** The vertex max parallelism (number of key groups). */
+        int maxParallelism;
+
+        /** The configured parallelism lower limit. */
+        int parallelismLowerLimit;
+
+        /** The configured parallelism upper limit. */
+        int parallelismUpperLimit;
+
+        /** The ship strategies of the vertex inputs. */
+        Collection<ShipStrategy> inputShipStrategies;
+
+        /** The full autoscaler context (cluster client, job configuration, 
etc.). */
+        JobAutoScalerContext<?> jobContext;
+
+        /** The per-vertex evaluated metrics. */
+        Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics;
+
+        /** The job topology. */
+        JobTopology jobTopology;

Review Comment:
   Feels like these should be a single context object, something like 
EvaluatedJobAutoScalerContext<..>



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's 
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current 
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ *      current │── within-range ──│ target │── above-target ──│ 
upperAlignLimit
+ * scale-down:
+ *      lowerAlignLimit │── below-target ──│ target │── within-range ──│ 
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is 
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom 
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's 
{@code PluginManager}
+ * in the operator) and selected by name through {@code 
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it, 
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {

Review Comment:
   Maybe we should name this ParallelismAlignmentMode to be a bit more explicit 
, alignment itself is probably too general term to be intuitive.



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's 
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current 
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ *      current │── within-range ──│ target │── above-target ──│ 
upperAlignLimit
+ * scale-down:
+ *      lowerAlignLimit │── below-target ──│ target │── within-range ──│ 
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is 
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom 
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's 
{@code PluginManager}
+ * in the operator) and selected by name through {@code 
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it, 
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
+
+    /**
+     * Whether this mode applies to the vertex described by {@code ctx}. When 
it returns {@code
+     * false} the autoscaler keeps the computed target parallelism and {@link 
#align(Context)} is
+     * not called. Defaults to keyBy (hash) vertices and to partitioned 
sources that report a
+     * partition count (Kafka and Pulsar do by default). A custom mode can 
widen this, for example
+     * to align custom partitioned vertices.
+     */
+    default boolean isApplicable(Context ctx) {
+        return ctx.getNumSourcePartitions() > 0
+                || ctx.getInputShipStrategies().contains(ShipStrategy.HASH);
+    }
+
+    /**
+     * Returns the parallelism to apply for the vertex described by {@code 
ctx}. Called only when
+     * {@link #isApplicable(Context)} returned {@code true}.
+     */
+    int align(Context ctx);
+
+    /**
+     * Immutable inputs to a single per-vertex parallelism alignment, handed 
to {@link
+     * AlignmentMode#align(Context)}. Besides the parallelism inputs it 
exposes the full autoscaler
+     * context, the per-vertex evaluated metrics, the job topology, and a 
prefix-stripped
+     * configuration for the selected mode, so custom modes have what they 
need.
+     */
+    @Value
+    class Context {
+        /** The vertex being aligned. */
+        JobVertexID vertex;
+
+        /** The current parallelism of the vertex. */
+        int currentParallelism;
+
+        /** The clamped computed target parallelism, before alignment. */
+        int newParallelism;
+
+        /** The number of source partitions, or a non-positive value when not 
a source. */
+        int numSourcePartitions;
+
+        /** The vertex max parallelism (number of key groups). */
+        int maxParallelism;

Review Comment:
   having these two separately forces every implementation to have an if logic 
for sources vs other vertices. Should we combine like in the ParallelismAligner 
`numKeyGroupsOrPartitions` ?



##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's 
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current 
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ *      current │── within-range ──│ target │── above-target ──│ 
upperAlignLimit
+ * scale-down:
+ *      lowerAlignLimit │── below-target ──│ target │── within-range ──│ 
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is 
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom 
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's 
{@code PluginManager}
+ * in the operator) and selected by name through {@code 
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it, 
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
+
+    /**
+     * Whether this mode applies to the vertex described by {@code ctx}. When 
it returns {@code
+     * false} the autoscaler keeps the computed target parallelism and {@link 
#align(Context)} is
+     * not called. Defaults to keyBy (hash) vertices and to partitioned 
sources that report a
+     * partition count (Kafka and Pulsar do by default). A custom mode can 
widen this, for example
+     * to align custom partitioned vertices.
+     */
+    default boolean isApplicable(Context ctx) {
+        return ctx.getNumSourcePartitions() > 0
+                || ctx.getInputShipStrategies().contains(ShipStrategy.HASH);
+    }
+
+    /**
+     * Returns the parallelism to apply for the vertex described by {@code 
ctx}. Called only when
+     * {@link #isApplicable(Context)} returned {@code true}.
+     */
+    int align(Context ctx);

Review Comment:
   Maybe call this: `adjustParallelism / computeParallelism / alignParallelism`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to