This is an automated email from the ASF dual-hosted git repository. gyfora pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git
commit cebc0b87e772a5bc27e87cbb9888034af5f9cd67 Author: Gyula Fora <g_f...@apple.com> AuthorDate: Fri Jan 20 13:05:13 2023 +0100 [hotfix] Improve autoscaling example --- examples/autoscaling/autoscaling.yaml | 6 ++--- .../main/java/autoscaling/AutoscalingExample.java | 27 +++++++++++++++++----- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/examples/autoscaling/autoscaling.yaml b/examples/autoscaling/autoscaling.yaml index 482e86b6..509db5d2 100644 --- a/examples/autoscaling/autoscaling.yaml +++ b/examples/autoscaling/autoscaling.yaml @@ -28,9 +28,8 @@ spec: kubernetes.operator.job.autoscaler.scaling.sources.enabled: "false" kubernetes.operator.job.autoscaler.stabilization.interval: "1m" kubernetes.operator.job.autoscaler.metrics.window: "3m" - pipeline.max-parallelism: "8" - - taskmanager.numberOfTaskSlots: "2" + pipeline.max-parallelism: "24" + taskmanager.numberOfTaskSlots: "4" state.savepoints.dir: file:///flink-data/savepoints state.checkpoints.dir: file:///flink-data/checkpoints high-availability: org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory @@ -61,3 +60,4 @@ spec: jarURI: local:///opt/flink/usrlib/autoscaling.jar parallelism: 1 upgradeMode: last-state + args: ["10"] diff --git a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java index c74fd52c..e76b8ec3 100644 --- a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java +++ b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java @@ -18,23 +18,38 @@ package autoscaling; +import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.SinkFunction; /** Autoscaling Example. */ public class AutoscalingExample { public static void main(String[] args) throws Exception { var env = StreamExecutionEnvironment.getExecutionEnvironment(); - DataStream<Long> stream = env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE); + long numIterations = Long.parseLong(args[0]); + DataStream<Long> stream = + env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).filter(i -> System.nanoTime() > 1); stream = stream.shuffle() .map( - i -> { - // Add sleep to artificially slow down processing - // Thread.sleep(sleep); - return i; + new RichMapFunction<Long, Long>() { + @Override + public Long map(Long i) throws Exception { + long end = 0; + for (int j = 0; j < numIterations; j++) { + end = System.nanoTime(); + } + return end; + } }); - stream.print(); + stream.addSink( + new SinkFunction<Long>() { + @Override + public void invoke(Long value, Context context) throws Exception { + // Do nothing + } + }); env.execute("Autoscaling Example"); } }