This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
     new dc4324a  [hotfix] Shorten the sleep period in checkpointing benchmarks
dc4324a is described below

commit dc4324a832c7c5e0cf53b3daeecb9304f5b6156c
Author: Dawid Wysakowicz <dwysakow...@apache.org>
AuthorDate: Tue Oct 5 10:35:54 2021 +0200

    [hotfix] Shorten the sleep period in checkpointing benchmarks
    
    This commit replaces a Thread.sleep(1) with busy waiting of ~200ns. The
    modifications makes the record flow more stable, which results in a more
    stable size of checkpointed channels and thus checkpoints time.
    
    This closes #34
---
 .../benchmark/CheckpointingTimeBenchmark.java      | 30 +++++++++-------------
 1 file changed, 12 insertions(+), 18 deletions(-)

diff --git 
a/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java 
b/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java
index 481999b..7b93900 100644
--- a/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java
+++ b/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java
@@ -35,7 +35,6 @@ import 
org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders;
 import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo;
 import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters;
 import org.apache.flink.runtime.testutils.CommonTestUtils;
-import org.apache.flink.streaming.api.CheckpointingMode;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions;
 import org.apache.flink.streaming.api.functions.sink.SinkFunction;
@@ -43,10 +42,7 @@ import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.concurrent.ExecutorThreadFactory;
 
 import org.openjdk.jmh.annotations.Benchmark;
-import org.openjdk.jmh.annotations.BenchmarkMode;
-import org.openjdk.jmh.annotations.Fork;
 import org.openjdk.jmh.annotations.Measurement;
-import org.openjdk.jmh.annotations.Mode;
 import org.openjdk.jmh.annotations.OutputTimeUnit;
 import org.openjdk.jmh.annotations.Param;
 import org.openjdk.jmh.annotations.Scope;
@@ -65,7 +61,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
 import java.util.function.Function;
 
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 
@@ -77,25 +72,24 @@ import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermar
  *
  * <ul>
  *   <li>The minimal memory segment size is decreased (256b) so that the 
scaling possibility is
- *       higher. Memory segments start with 4kb
- *   <li>A memory segment of the minimal size fits ~3 records (of size 64b), 
each record takes ~1ms
+ *       higher. Memory segments start with 8kb
+ *   <li>A memory segment of the minimal size fits ~9 records (of size 29b), 
each record takes ~200ns
  *       to be processed by the sink
  *   <li>We have 2 (exclusive buffers) * 4 (parallelism) + 8 floating = 64 
buffers per gate, with
- *       300 ms debloating target and ~1ms/record processing speed, we can 
buffer 300/64 = ~4.5
- *       records in a buffer after debloating which means the size of a buffer 
is slightly above the
+ *       300 ms debloating target and ~200ns/record processing speed, we can 
buffer 1500/64 = ~24
+ *       records in a buffer after debloating which means the size of a buffer 
(24 * 29 = 696) is slightly above the
  *       minimal memory segment size.
  *   <li>The buffer debloating target of 300ms means a checkpoint should take 
~2(number of
  *       exchanges)*300ms=~600ms
  * </ul>
  */
 @OutputTimeUnit(SECONDS)
-@Warmup(iterations = 4)
 public class CheckpointingTimeBenchmark extends BenchmarkBase {
     public static final int JOB_PARALLELISM = 4;
-    public static final MemorySize START_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("4 kb");
+    public static final MemorySize START_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("8 kb");
     public static final MemorySize MIN_MEMORY_SEGMENT_SIZE = 
MemorySize.parse("256 b");
     public static final Duration DEBLOATING_TARGET = Duration.of(300, 
ChronoUnit.MILLIS);
-    public static final MemorySize DEBLOATING_RECORD_SIZE = 
MemorySize.parse("64b");
+    public static final MemorySize DEBLOATING_RECORD_SIZE = 
MemorySize.parse("1b");
     public static final MemorySize UNALIGNED_RECORD_SIZE = 
MemorySize.parse("1kb");
     public static final int DEBLOATING_STABILIZATION_PERIOD = 2_000;
 
@@ -110,8 +104,7 @@ public class CheckpointingTimeBenchmark extends 
BenchmarkBase {
     }
 
     @Benchmark
-    public void checkpointSingleInput(DebloatedCheckpointEnvironmentContext 
context)
-            throws Exception {
+    public void checkpointSingleInput(CheckpointEnvironmentContext context) 
throws Exception {
         final CompletableFuture<String> checkpoint =
                 context.miniCluster.triggerCheckpoint(context.jobID);
         checkpoint.get();
@@ -151,7 +144,7 @@ public class CheckpointingTimeBenchmark extends 
BenchmarkBase {
                     config.set(TaskManagerOptions.BUFFER_DEBLOAT_TARGET, 
DEBLOATING_TARGET);
                     config.set(
                             TaskManagerOptions.BUFFER_DEBLOAT_PERIOD,
-                            Duration.of(10, ChronoUnit.MILLIS));
+                            Duration.of(200, ChronoUnit.MILLIS));
                     config.set(TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES, 5);
                     return config;
                 },
@@ -185,7 +178,7 @@ public class CheckpointingTimeBenchmark extends 
BenchmarkBase {
     }
 
     @State(Scope.Thread)
-    public static class DebloatedCheckpointEnvironmentContext extends 
FlinkEnvironmentContext {
+    public static class CheckpointEnvironmentContext extends 
FlinkEnvironmentContext {
         public JobID jobID;
 
         @Param({"ALIGNED", "UNALIGNED", "UNALIGNED_1"})
@@ -284,8 +277,9 @@ public class CheckpointingTimeBenchmark extends 
BenchmarkBase {
      */
     public static class SlowDiscardSink<T> implements SinkFunction<T> {
         @Override
-        public void invoke(T value, Context context) throws Exception {
-            Thread.sleep(1);
+        public void invoke(T value, Context context) {
+            final long startTime = System.nanoTime();
+            while (System.nanoTime() - startTime < 200_000) {}
         }
     }
 }

Reply via email to