akalash commented on a change in pull request #14:
URL: https://github.com/apache/flink-benchmarks/pull/14#discussion_r654266208
##########
File path:
src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,119 +43,90 @@
import java.io.IOException;
import java.time.Duration;
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
/**
- * The benchmark for measuring the time taken to finish the configured number
of
- * unaligned checkpoints.
+ * The benchmark for measuring the time taken to finish the configured number
of unaligned
+ * checkpoints.
*/
-@OutputTimeUnit(MINUTES)
-@OperationsPerInvocation(value =
UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+@OutputTimeUnit(SECONDS)
+@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
- public static final int NUM_FINISHED_CHECKPOINTS = 5;
+ public static final int NUM_FINISHED_CHECKPOINTS = 10;
private static final int NUM_VERTICES = 3;
private static final int PARALLELISM = 4;
private static final long CHECKPOINT_INTERVAL_MS = 10;
public static void main(String[] args) throws RunnerException {
- Options options = new OptionsBuilder()
- .verbosity(VerboseMode.NORMAL)
- .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
- .build();
+ Options options =
+ new OptionsBuilder()
+ .verbosity(VerboseMode.NORMAL)
+
.include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+ .build();
new Runner(options).run();
}
@Benchmark
- public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext
context) throws Exception {
+ public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext
context)
+ throws Exception {
StreamExecutionEnvironment env = context.env;
- DataStreamSource<byte[]> source = env.addSource(new
FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
- source
- .slotSharingGroup("source").rebalance()
- .map((MapFunction<byte[], byte[]>) value ->
value).slotSharingGroup("map").rebalance()
- .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+ DataStreamSource<Record> source =
+ env.fromSource(
+ new RecordSource(NUM_FINISHED_CHECKPOINTS),
+ noWatermarks(),
+ RecordSource.class.getName());
+
+ source.slotSharingGroup("source")
+ .rebalance()
+ .map((MapFunction<Record, Record>) value -> value)
+ .slotSharingGroup("map")
+ .rebalance()
+ .addSink(new SlowDiscardSink<>())
+ .slotSharingGroup("sink");
env.execute();
}
public static class UnalignedCheckpointEnvironmentContext extends
FlinkEnvironmentContext {
- @Param({"REMOTE", "LOCAL"})
- public String mode = "REMOTE";
+ @Param({"0", "1", "ALIGNED"})
+ public String timeout = "0";
@Setup
public void setUp() throws IOException {
super.setUp();
env.setParallelism(parallelism);
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
- env.getCheckpointConfig().enableUnalignedCheckpoints(true);
- env.getCheckpointConfig().setAlignmentTimeout(Duration.ZERO);
+ if ("ALIGNED".equals(timeout)) {
+ env.getCheckpointConfig().enableUnalignedCheckpoints(false);
+ } else {
+ env.getCheckpointConfig().enableUnalignedCheckpoints(true);
+ env.getCheckpointConfig()
+
.setAlignmentTimeout(Duration.ofMillis(Integer.parseInt(timeout)));
+ }
}
protected Configuration createConfiguration() {
Configuration conf = super.createConfiguration();
-
- if (mode.equals("REMOTE")) {
- conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
- conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER,
NUM_VERTICES * PARALLELISM);
- } else {
- conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS,
NUM_VERTICES * PARALLELISM);
- conf.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 1);
- }
+ conf.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);
Review comment:
Don't we want to test different numbers of slots per task manager? Or it
doesn't make a lot of sense in this scenario?
##########
File path:
src/main/java/org/apache/flink/benchmark/UnalignedCheckpointTimeBenchmark.java
##########
@@ -42,119 +43,90 @@
import java.io.IOException;
import java.time.Duration;
-import static java.util.concurrent.TimeUnit.MINUTES;
+import static java.util.concurrent.TimeUnit.SECONDS;
+import static
org.apache.flink.api.common.eventtime.WatermarkStrategy.noWatermarks;
/**
- * The benchmark for measuring the time taken to finish the configured number
of
- * unaligned checkpoints.
+ * The benchmark for measuring the time taken to finish the configured number
of unaligned
+ * checkpoints.
*/
-@OutputTimeUnit(MINUTES)
-@OperationsPerInvocation(value =
UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
+@OutputTimeUnit(SECONDS)
+@OperationsPerInvocation(UnalignedCheckpointTimeBenchmark.NUM_FINISHED_CHECKPOINTS)
public class UnalignedCheckpointTimeBenchmark extends BenchmarkBase {
- public static final int NUM_FINISHED_CHECKPOINTS = 5;
+ public static final int NUM_FINISHED_CHECKPOINTS = 10;
private static final int NUM_VERTICES = 3;
private static final int PARALLELISM = 4;
private static final long CHECKPOINT_INTERVAL_MS = 10;
public static void main(String[] args) throws RunnerException {
- Options options = new OptionsBuilder()
- .verbosity(VerboseMode.NORMAL)
- .include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
- .build();
+ Options options =
+ new OptionsBuilder()
+ .verbosity(VerboseMode.NORMAL)
+
.include(UnalignedCheckpointTimeBenchmark.class.getCanonicalName())
+ .build();
new Runner(options).run();
}
@Benchmark
- public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext
context) throws Exception {
+ public void unalignedCheckpoint(UnalignedCheckpointEnvironmentContext
context)
+ throws Exception {
StreamExecutionEnvironment env = context.env;
- DataStreamSource<byte[]> source = env.addSource(new
FiniteCheckpointSource(NUM_FINISHED_CHECKPOINTS));
- source
- .slotSharingGroup("source").rebalance()
- .map((MapFunction<byte[], byte[]>) value ->
value).slotSharingGroup("map").rebalance()
- .addSink(new SlowDiscardSink<>()).slotSharingGroup("sink");
+ DataStreamSource<Record> source =
+ env.fromSource(
+ new RecordSource(NUM_FINISHED_CHECKPOINTS),
+ noWatermarks(),
+ RecordSource.class.getName());
+
+ source.slotSharingGroup("source")
+ .rebalance()
+ .map((MapFunction<Record, Record>) value -> value)
+ .slotSharingGroup("map")
+ .rebalance()
+ .addSink(new SlowDiscardSink<>())
+ .slotSharingGroup("sink");
env.execute();
}
public static class UnalignedCheckpointEnvironmentContext extends
FlinkEnvironmentContext {
- @Param({"REMOTE", "LOCAL"})
- public String mode = "REMOTE";
+ @Param({"0", "1", "ALIGNED"})
Review comment:
Does it make sense to have the benchmark with the alignment timeout
equal to 1 ms?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]