gyfora commented on code in PR #1088:
URL:
https://github.com/apache/flink-kubernetes-operator/pull/1088#discussion_r3472737314
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/config/AutoScalerOptions.java:
##########
@@ -375,17 +376,65 @@ private static ConfigOptions.OptionBuilder
autoScalerConfig(String key) {
.withDescription(
"Quota of the CPU count. When scaling would go
beyond this number the the scaling is not going to happen.");
- public static final
ConfigOption<JobVertexScaler.KeyGroupOrPartitionsAdjustMode>
+ @Deprecated
+ public static final ConfigOption<KeyGroupOrPartitionsAdjustMode>
SCALING_KEY_GROUP_PARTITIONS_ADJUST_MODE =
autoScalerConfig("scaling.key-group.partitions.adjust.mode")
-
.enumType(JobVertexScaler.KeyGroupOrPartitionsAdjustMode.class)
- .defaultValue(
-
JobVertexScaler.KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
+ .enumType(KeyGroupOrPartitionsAdjustMode.class)
+
.defaultValue(KeyGroupOrPartitionsAdjustMode.EVENLY_SPREAD)
.withFallbackKeys(
oldOperatorConfigKey(
"scaling.key-group.partitions.adjust.mode"))
.withDescription(
- "How to adjust the parallelism of Source
vertex or upstream shuffle is keyBy");
+ "Deprecated, use scaling.alignment.mode
instead. How to adjust "
+ + "the parallelism of Source
vertex or upstream partitioning is keyBy");
+
+ public static final ConfigOption<String> ALIGNMENT_MODE =
+ autoScalerConfig("scaling.alignment.mode")
Review Comment:
lets call this everywhere `scaling.parallelism-alignment.mode`
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ * current │── within-range ──│ target │── above-target ──│
upperAlignLimit
+ * scale-down:
+ * lowerAlignLimit │── below-target ──│ target │── within-range ──│
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's
{@code PluginManager}
+ * in the operator) and selected by name through {@code
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it,
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
+
+ /**
+ * Whether this mode applies to the vertex described by {@code ctx}. When
it returns {@code
+ * false} the autoscaler keeps the computed target parallelism and {@link
#align(Context)} is
+ * not called. Defaults to keyBy (hash) vertices and to partitioned
sources that report a
+ * partition count (Kafka and Pulsar do by default). A custom mode can
widen this, for example
+ * to align custom partitioned vertices.
+ */
+ default boolean isApplicable(Context ctx) {
+ return ctx.getNumSourcePartitions() > 0
+ || ctx.getInputShipStrategies().contains(ShipStrategy.HASH);
+ }
+
+ /**
+ * Returns the parallelism to apply for the vertex described by {@code
ctx}. Called only when
+ * {@link #isApplicable(Context)} returned {@code true}.
+ */
+ int align(Context ctx);
+
+ /**
+ * Immutable inputs to a single per-vertex parallelism alignment, handed
to {@link
+ * AlignmentMode#align(Context)}. Besides the parallelism inputs it
exposes the full autoscaler
+ * context, the per-vertex evaluated metrics, the job topology, and a
prefix-stripped
+ * configuration for the selected mode, so custom modes have what they
need.
+ */
+ @Value
+ class Context {
+ /** The vertex being aligned. */
+ JobVertexID vertex;
+
+ /** The current parallelism of the vertex. */
+ int currentParallelism;
+
+ /** The clamped computed target parallelism, before alignment. */
+ int newParallelism;
+
+ /** The number of source partitions, or a non-positive value when not
a source. */
+ int numSourcePartitions;
+
+ /** The vertex max parallelism (number of key groups). */
+ int maxParallelism;
+
+ /** The configured parallelism lower limit. */
+ int parallelismLowerLimit;
+
+ /** The configured parallelism upper limit. */
+ int parallelismUpperLimit;
+
+ /** The ship strategies of the vertex inputs. */
+ Collection<ShipStrategy> inputShipStrategies;
+
+ /** The full autoscaler context (cluster client, job configuration,
etc.). */
+ JobAutoScalerContext<?> jobContext;
+
+ /** The per-vertex evaluated metrics. */
+ Map<ScalingMetric, EvaluatedScalingMetric> evaluatedMetrics;
+
+ /** The job topology. */
+ JobTopology jobTopology;
Review Comment:
Feels like these should be a single context object, something like
EvaluatedJobAutoScalerContext<..>
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ * current │── within-range ──│ target │── above-target ──│
upperAlignLimit
+ * scale-down:
+ * lowerAlignLimit │── below-target ──│ target │── within-range ──│
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's
{@code PluginManager}
+ * in the operator) and selected by name through {@code
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it,
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
Review Comment:
Maybe we should name this ParallelismAlignmentMode to be a bit more explicit
, alignment itself is probably too general term to be intuitive.
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ * current │── within-range ──│ target │── above-target ──│
upperAlignLimit
+ * scale-down:
+ * lowerAlignLimit │── below-target ──│ target │── within-range ──│
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's
{@code PluginManager}
+ * in the operator) and selected by name through {@code
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it,
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
+
+ /**
+ * Whether this mode applies to the vertex described by {@code ctx}. When
it returns {@code
+ * false} the autoscaler keeps the computed target parallelism and {@link
#align(Context)} is
+ * not called. Defaults to keyBy (hash) vertices and to partitioned
sources that report a
+ * partition count (Kafka and Pulsar do by default). A custom mode can
widen this, for example
+ * to align custom partitioned vertices.
+ */
+ default boolean isApplicable(Context ctx) {
+ return ctx.getNumSourcePartitions() > 0
+ || ctx.getInputShipStrategies().contains(ShipStrategy.HASH);
+ }
+
+ /**
+ * Returns the parallelism to apply for the vertex described by {@code
ctx}. Called only when
+ * {@link #isApplicable(Context)} returned {@code true}.
+ */
+ int align(Context ctx);
+
+ /**
+ * Immutable inputs to a single per-vertex parallelism alignment, handed
to {@link
+ * AlignmentMode#align(Context)}. Besides the parallelism inputs it
exposes the full autoscaler
+ * context, the per-vertex evaluated metrics, the job topology, and a
prefix-stripped
+ * configuration for the selected mode, so custom modes have what they
need.
+ */
+ @Value
+ class Context {
+ /** The vertex being aligned. */
+ JobVertexID vertex;
+
+ /** The current parallelism of the vertex. */
+ int currentParallelism;
+
+ /** The clamped computed target parallelism, before alignment. */
+ int newParallelism;
+
+ /** The number of source partitions, or a non-positive value when not
a source. */
+ int numSourcePartitions;
+
+ /** The vertex max parallelism (number of key groups). */
+ int maxParallelism;
Review Comment:
having these two separately forces every implementation to have an if logic
for sources vs other vertices. Should we combine like in the ParallelismAligner
`numKeyGroupsOrPartitions` ?
##########
flink-autoscaler/src/main/java/org/apache/flink/autoscaler/alignment/AlignmentMode.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.autoscaler.alignment;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.autoscaler.JobAutoScalerContext;
+import org.apache.flink.autoscaler.metrics.EvaluatedScalingMetric;
+import org.apache.flink.autoscaler.metrics.ScalingMetric;
+import org.apache.flink.autoscaler.topology.JobTopology;
+import org.apache.flink.autoscaler.topology.ShipStrategy;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import lombok.Value;
+
+import java.util.Collection;
+import java.util.Map;
+
+/**
+ * Determines the parallelism to apply for a vertex, given the autoscaler's
computed target and the
+ * surrounding {@link Context}.
+ *
+ * <p>An alignment mode reasons about regions relative to the current
parallelism and the computed
+ * target:
+ *
+ * <pre>{@code
+ * scale-up:
+ * current │── within-range ──│ target │── above-target ──│
upperAlignLimit
+ * scale-down:
+ * lowerAlignLimit │── below-target ──│ target │── within-range ──│
current
+ * }</pre>
+ *
+ * <p>This is the extension seam for tuning how the computed parallelism is
adjusted. The built-in
+ * behaviors are provided by {@link BuiltInAlignmentMode}. Custom
implementations are discovered as
+ * plugins (via {@code ServiceLoader} in the standalone autoscaler, or Flink's
{@code PluginManager}
+ * in the operator) and selected by name through {@code
scaling.alignment.mode} plus {@code
+ * scaling.alignment.mode.<name>.class}.
+ *
+ * <p>An implementation may keep the computed target unchanged or adjust it,
and decides for itself
+ * whether it applies to a given vertex (see {@link #isApplicable(Context)}).
+ */
+@Experimental
+public interface AlignmentMode {
+
+ /**
+ * Whether this mode applies to the vertex described by {@code ctx}. When
it returns {@code
+ * false} the autoscaler keeps the computed target parallelism and {@link
#align(Context)} is
+ * not called. Defaults to keyBy (hash) vertices and to partitioned
sources that report a
+ * partition count (Kafka and Pulsar do by default). A custom mode can
widen this, for example
+ * to align custom partitioned vertices.
+ */
+ default boolean isApplicable(Context ctx) {
+ return ctx.getNumSourcePartitions() > 0
+ || ctx.getInputShipStrategies().contains(ShipStrategy.HASH);
+ }
+
+ /**
+ * Returns the parallelism to apply for the vertex described by {@code
ctx}. Called only when
+ * {@link #isApplicable(Context)} returned {@code true}.
+ */
+ int align(Context ctx);
Review Comment:
Maybe call this: `adjustParallelism / computeParallelism / alignParallelism`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]