pnowojski commented on a change in pull request #14:
URL: https://github.com/apache/flink-benchmarks/pull/14#discussion_r654377463



##########
File path: 
src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,119 +43,90 @@
 import java.io.IOException;
 import java.time.Duration;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 
 /**
- * The benchmark for measuring the time taken to finish the configured number 
of
- * unaligned checkpoints.
+ * The benchmark for measuring the time taken to finish the configured number 
of unaligned
+ * checkpoints.
  */
-@OutputTimeUnit(MINUTES)
-@OperationsPerInvocation(value = 
UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+@OutputTimeUnit(SECONDS)
+@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
 public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
-    public static final int NUM_FINISHED_CHECKPOINTS = 5;
+    public static final int NUM_FINISHED_CHECKPOINTS = 10;
     private static final int NUM_VERTICES = 3;
     private static final int PARALLELISM = 4;
     private static final long CHECKPOINT_INTERVAL_MS = 10;
 
     public static void main(String[] args) throws RunnerException {
-        Options options = new OptionsBuilder()
-            .verbosity(VerboseMode.NORMAL)
-            .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
-            .build();
+        Options options =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        
.include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+                        .build();
 
         new Runner(options).run();
     }
 
     @Benchmark
-    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext 
context) throws Exception {
+    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext 
context)
+            throws Exception {
         StreamExecutionEnvironment env = context.env;
-        DataStreamSource<byte[]> source = env.addSource(new 
FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
-        source
-            .slotSharingGroup("source").rebalance()
-            .map((MapFunction<byte[], byte[]>) value -> 
value).slotSharingGroup("map").rebalance()
-            .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+        DataStreamSource<Record> source =
+                env.fromSource(
+                        new RecordSource(NUM_FINISHED_CHECKPOINTS),
+                        noWatermarks(),
+                        RecordSource.class.getName());
+
+        source.slotSharingGroup("source")
+                .rebalance()
+                .map((MapFunction<Record, Record>) value -> value)
+                .slotSharingGroup("map")
+                .rebalance()
+                .addSink(new SlowDiscardSink<>())
+                .slotSharingGroup("sink");
 
         env.execute();
     }
 
     public static class UnalignedCheckpointEnvironmentContext extends 
FlinkEnvironmentContext {
 
-        @Param({"REMOTE", "LOCAL"})
-        public String mode = "REMOTE";
+        @Param({"0", "1", "ALIGNED"})

Review comment:
       I think so. `ALIGNED` and `0` do not test timeouts whatsoever, so we 
need some setting that would benchmark the timeouts. The lowest possible 
timeout setting (`1ms`) makes the most sense, as we would ideally be expecting 
the results be almost as good (+/- 1ms) as fully unaligned checkpoints. Any 
setting higher than that, and we are actually benchmarking sleep times, instead 
of efficiency/overhead of the time outing algorithm.

##########
File path: 
src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,119 +43,90 @@
 import java.io.IOException;
 import java.time.Duration;
 
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static 
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
 
 /**
- * The benchmark for measuring the time taken to finish the configured number 
of
- * unaligned checkpoints.
+ * The benchmark for measuring the time taken to finish the configured number 
of unaligned
+ * checkpoints.
  */
-@OutputTimeUnit(MINUTES)
-@OperationsPerInvocation(value = 
UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+@OutputTimeUnit(SECONDS)
+@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
 public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
-    public static final int NUM_FINISHED_CHECKPOINTS = 5;
+    public static final int NUM_FINISHED_CHECKPOINTS = 10;
     private static final int NUM_VERTICES = 3;
     private static final int PARALLELISM = 4;
     private static final long CHECKPOINT_INTERVAL_MS = 10;
 
     public static void main(String[] args) throws RunnerException {
-        Options options = new OptionsBuilder()
-            .verbosity(VerboseMode.NORMAL)
-            .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
-            .build();
+        Options options =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        
.include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+                        .build();
 
         new Runner(options).run();
     }
 
     @Benchmark
-    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext 
context) throws Exception {
+    public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext 
context)
+            throws Exception {
         StreamExecutionEnvironment env = context.env;
-        DataStreamSource<byte[]> source = env.addSource(new 
FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
-        source
-            .slotSharingGroup("source").rebalance()
-            .map((MapFunction<byte[], byte[]>) value -> 
value).slotSharingGroup("map").rebalance()
-            .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+        DataStreamSource<Record> source =
+                env.fromSource(
+                        new RecordSource(NUM_FINISHED_CHECKPOINTS),
+                        noWatermarks(),
+                        RecordSource.class.getName());
+
+        source.slotSharingGroup("source")
+                .rebalance()
+                .map((MapFunction<Record, Record>) value -> value)
+                .slotSharingGroup("map")
+                .rebalance()
+                .addSink(new SlowDiscardSink<>())
+                .slotSharingGroup("sink");
 
         env.execute();
     }
 
     public static class UnalignedCheckpointEnvironmentContext extends 
FlinkEnvironmentContext {
 
-        @Param({"REMOTE", "LOCAL"})
-        public String mode = "REMOTE";
+        @Param({"0", "1", "ALIGNED"})
+        public String timeout = "0";
 
         @Setup
         public void setUp() throws IOException {
             super.setUp();
 
             env.setParallelism(parallelism);
             env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
-            env.getCheckpointConfig().enableUnalignedCheckpoints(true);
-            env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
+            if ("ALIGNED".equals(timeout)) {
+                env.getCheckpointConfig().enableUnalignedCheckpoints(false);
+            } else {
+                env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+                env.getCheckpointConfig()
+                        
.setAlignmentTimeout(Duration.ofMillis(Integer.parseInt(timeout)));
+            }
         }
 
         protected Configuration createConfiguration() {
             Configuration conf = super.createConfiguration();
-
-            if (mode.equals("REMOTE")) {
-                conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
-                conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 
NUM_VERTICES * PARALLELISM);
-            } else {
-                conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 
NUM_VERTICES * PARALLELISM);
-                conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
-            }
+            conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

Review comment:
       With infinite resources available it would make sense to test with 
various number of local/remote input channels. But having as many as possible 
remote input channels and single local channel I think is a worst case scenario 
that's testing both code paths at the same time, so single slot per TM I think 
should be good enough, and should cover all cases well enough.
   
   If needed, we can expand this test in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to