This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-kubernetes-operator.git

commit cebc0b87e772a5bc27e87cbb9888034af5f9cd67
Author: Gyula Fora <g_f...@apple.com>
AuthorDate: Fri Jan 20 13:05:13 2023 +0100

    [hotfix] Improve autoscaling example
---
 examples/autoscaling/autoscaling.yaml              |  6 ++---
 .../main/java/autoscaling/AutoscalingExample.java  | 27 +++++++++++++++++-----
 2 files changed, 24 insertions(+), 9 deletions(-)

diff --git a/examples/autoscaling/autoscaling.yaml 
b/examples/autoscaling/autoscaling.yaml
index 482e86b6..509db5d2 100644
--- a/examples/autoscaling/autoscaling.yaml
+++ b/examples/autoscaling/autoscaling.yaml
@@ -28,9 +28,8 @@ spec:
     kubernetes.operator.job.autoscaler.scaling.sources.enabled: "false"
     kubernetes.operator.job.autoscaler.stabilization.interval: "1m"
     kubernetes.operator.job.autoscaler.metrics.window: "3m"
-    pipeline.max-parallelism: "8"
-
-    taskmanager.numberOfTaskSlots: "2"
+    pipeline.max-parallelism: "24"
+    taskmanager.numberOfTaskSlots: "4"
     state.savepoints.dir: file:///flink-data/savepoints
     state.checkpoints.dir: file:///flink-data/checkpoints
     high-availability: 
org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
@@ -61,3 +60,4 @@ spec:
     jarURI: local:///opt/flink/usrlib/autoscaling.jar
     parallelism: 1
     upgradeMode: last-state
+    args: ["10"]
diff --git 
a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java 
b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
index c74fd52c..e76b8ec3 100644
--- a/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
+++ b/examples/autoscaling/src/main/java/autoscaling/AutoscalingExample.java
@@ -18,23 +18,38 @@
 
 package autoscaling;
 
+import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
 
 /** Autoscaling Example. */
 public class AutoscalingExample {
     public static void main(String[] args) throws Exception {
         var env = StreamExecutionEnvironment.getExecutionEnvironment();
-        DataStream<Long> stream = env.fromSequence(Long.MIN_VALUE, 
Long.MAX_VALUE);
+        long numIterations = Long.parseLong(args[0]);
+        DataStream<Long> stream =
+                env.fromSequence(Long.MIN_VALUE, Long.MAX_VALUE).filter(i -> 
System.nanoTime() > 1);
         stream =
                 stream.shuffle()
                         .map(
-                                i -> {
-                                    // Add sleep to artificially slow down 
processing
-                                    // Thread.sleep(sleep);
-                                    return i;
+                                new RichMapFunction<Long, Long>() {
+                                    @Override
+                                    public Long map(Long i) throws Exception {
+                                        long end = 0;
+                                        for (int j = 0; j < numIterations; 
j++) {
+                                            end = System.nanoTime();
+                                        }
+                                        return end;
+                                    }
                                 });
-        stream.print();
+        stream.addSink(
+                new SinkFunction<Long>() {
+                    @Override
+                    public void invoke(Long value, Context context) throws 
Exception {
+                        // Do nothing
+                    }
+                });
         env.execute("Autoscaling Example");
     }
 }

Reply via email to