This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git
The following commit(s) were added to refs/heads/master by this push: new dc4324a [hotfix] Shorten the sleep period in checkpointing benchmarks dc4324a is described below commit dc4324a832c7c5e0cf53b3daeecb9304f5b6156c Author: Dawid Wysakowicz <dwysakow...@apache.org> AuthorDate: Tue Oct 5 10:35:54 2021 +0200 [hotfix] Shorten the sleep period in checkpointing benchmarks This commit replaces a Thread.sleep(1) with busy waiting of ~200ns. The modifications makes the record flow more stable, which results in a more stable size of checkpointed channels and thus checkpoints time. This closes #34 --- .../benchmark/CheckpointingTimeBenchmark.java | 30 +++++++++------------- 1 file changed, 12 insertions(+), 18 deletions(-) diff --git a/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java b/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java index 481999b..7b93900 100644 --- a/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/CheckpointingTimeBenchmark.java @@ -35,7 +35,6 @@ import org.apache.flink.runtime.rest.messages.JobVertexBackPressureHeaders; import org.apache.flink.runtime.rest.messages.JobVertexBackPressureInfo; import org.apache.flink.runtime.rest.messages.JobVertexMessageParameters; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -43,10 +42,7 @@ import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.concurrent.ExecutorThreadFactory; import org.openjdk.jmh.annotations.Benchmark; -import org.openjdk.jmh.annotations.BenchmarkMode; -import org.openjdk.jmh.annotations.Fork; import org.openjdk.jmh.annotations.Measurement; -import org.openjdk.jmh.annotations.Mode; import org.openjdk.jmh.annotations.OutputTimeUnit; import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.annotations.Scope; @@ -65,7 +61,6 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.function.Function; -import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks; @@ -77,25 +72,24 @@ import static org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermar * * <ul> * <li>The minimal memory segment size is decreased (256b) so that the scaling possibility is - * higher. Memory segments start with 4kb - * <li>A memory segment of the minimal size fits ~3 records (of size 64b), each record takes ~1ms + * higher. Memory segments start with 8kb + * <li>A memory segment of the minimal size fits ~9 records (of size 29b), each record takes ~200ns * to be processed by the sink * <li>We have 2 (exclusive buffers) * 4 (parallelism) + 8 floating = 64 buffers per gate, with - * 300 ms debloating target and ~1ms/record processing speed, we can buffer 300/64 = ~4.5 - * records in a buffer after debloating which means the size of a buffer is slightly above the + * 300 ms debloating target and ~200ns/record processing speed, we can buffer 1500/64 = ~24 + * records in a buffer after debloating which means the size of a buffer (24 * 29 = 696) is slightly above the * minimal memory segment size. * <li>The buffer debloating target of 300ms means a checkpoint should take ~2(number of * exchanges)*300ms=~600ms * </ul> */ @OutputTimeUnit(SECONDS) -@Warmup(iterations = 4) public class CheckpointingTimeBenchmark extends BenchmarkBase { public static final int JOB_PARALLELISM = 4; - public static final MemorySize START_MEMORY_SEGMENT_SIZE = MemorySize.parse("4 kb"); + public static final MemorySize START_MEMORY_SEGMENT_SIZE = MemorySize.parse("8 kb"); public static final MemorySize MIN_MEMORY_SEGMENT_SIZE = MemorySize.parse("256 b"); public static final Duration DEBLOATING_TARGET = Duration.of(300, ChronoUnit.MILLIS); - public static final MemorySize DEBLOATING_RECORD_SIZE = MemorySize.parse("64b"); + public static final MemorySize DEBLOATING_RECORD_SIZE = MemorySize.parse("1b"); public static final MemorySize UNALIGNED_RECORD_SIZE = MemorySize.parse("1kb"); public static final int DEBLOATING_STABILIZATION_PERIOD = 2_000; @@ -110,8 +104,7 @@ public class CheckpointingTimeBenchmark extends BenchmarkBase { } @Benchmark - public void checkpointSingleInput(DebloatedCheckpointEnvironmentContext context) - throws Exception { + public void checkpointSingleInput(CheckpointEnvironmentContext context) throws Exception { final CompletableFuture<String> checkpoint = context.miniCluster.triggerCheckpoint(context.jobID); checkpoint.get(); @@ -151,7 +144,7 @@ public class CheckpointingTimeBenchmark extends BenchmarkBase { config.set(TaskManagerOptions.BUFFER_DEBLOAT_TARGET, DEBLOATING_TARGET); config.set( TaskManagerOptions.BUFFER_DEBLOAT_PERIOD, - Duration.of(10, ChronoUnit.MILLIS)); + Duration.of(200, ChronoUnit.MILLIS)); config.set(TaskManagerOptions.BUFFER_DEBLOAT_SAMPLES, 5); return config; }, @@ -185,7 +178,7 @@ public class CheckpointingTimeBenchmark extends BenchmarkBase { } @State(Scope.Thread) - public static class DebloatedCheckpointEnvironmentContext extends FlinkEnvironmentContext { + public static class CheckpointEnvironmentContext extends FlinkEnvironmentContext { public JobID jobID; @Param({"ALIGNED", "UNALIGNED", "UNALIGNED_1"}) @@ -284,8 +277,9 @@ public class CheckpointingTimeBenchmark extends BenchmarkBase { */ public static class SlowDiscardSink<T> implements SinkFunction<T> { @Override - public void invoke(T value, Context context) throws Exception { - Thread.sleep(1); + public void invoke(T value, Context context) { + final long startTime = System.nanoTime(); + while (System.nanoTime() - startTime < 200_000) {} } } }