mateczagany commented on code in PR #109:
URL: https://github.com/apache/flink-benchmarks/pull/109#discussion_r3254164459


##########
src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/resolver/SlotSharingResolverBenchmarkExecutor.java:
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.resolver;
+
+import 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.RunnerException;
+
+import static 
org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.strategy.InitSlotSharingStrategyBenchmarkExecutor.JOB_VERTICES;
+
+/**
+ * The executor to run {@link SlotSharingResolverBenchmark} for using {@link
+ * org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingResolver}.
+ */
+public class SlotSharingResolverBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+
+    @Param({"NONE", "TASKS"})
+    private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
+
+    private SlotSharingResolverBenchmark benchmark;
+
+    public static void main(String[] args) throws RunnerException {
+        runBenchmark(SlotSharingResolverBenchmark.class);

Review Comment:
   This should pass `SlotSharingResolverBenchmarkExecutor.class`



##########
src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/strategy/RequestSlotMatchingStrategyBenchmarkExecutor.java:
##########
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.scheduler.benchmark.scheduling.slot.matching.strategy;
+
+import 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.PendingRequest;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import org.apache.flink.runtime.jobmaster.slotpool.RequestSlotMatchingStrategy;
+import 
org.apache.flink.runtime.jobmaster.slotpool.SimpleRequestSlotMatchingStrategy;
+import 
org.apache.flink.runtime.jobmaster.slotpool.TasksBalancedRequestSlotMatchingStrategy;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;

Review Comment:
   It has been renamed in https://github.com/apache/flink/pull/27333/changesgg 
so it probably should be:
   ```
   import 
org.apache.flink.runtime.scheduler.taskexecload.DefaultTaskExecutionLoad;
   ```



##########
src/main/java/org/apache/flink/scheduler/benchmark/throughput/ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.scheduler.benchmark.throughput;
+
+import org.apache.flink.benchmark.BenchmarkBase;
+import org.apache.flink.benchmark.FlinkEnvironmentContext;
+import org.apache.flink.benchmark.functions.LongSourceType;
+import org.apache.flink.benchmark.functions.MultiplyByTwo;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.v2.DiscardingSink;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.OperationsPerInvocation;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.runner.Runner;
+import org.openjdk.jmh.runner.RunnerException;
+import org.openjdk.jmh.runner.options.Options;
+import org.openjdk.jmh.runner.options.OptionsBuilder;
+import org.openjdk.jmh.runner.options.VerboseMode;
+
+@OperationsPerInvocation(
+        value = 
ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark.RECORDS_PER_INVOCATION)
+public class ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark 
extends BenchmarkBase {
+    public static final int RECORDS_PER_INVOCATION = 150_000;
+    private static final long CHECKPOINT_INTERVAL_MS = 100;
+
+    @Param({"F27_UNBOUNDED"})
+    public LongSourceType sourceType;
+
+    @Param({"NONE", "TASKS"})
+    public TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
+
+    @Param({"true", "false"})
+    public boolean allParallelismsOfVerticesSame;
+
+    public static void main(String[] args) throws RunnerException {
+        Options options =
+                new OptionsBuilder()
+                        .verbosity(VerboseMode.NORMAL)
+                        .include(
+                                ".*"
+                                        + 
ThroughputOfTaskManagerLoadBalanceModeForStreamingBenchmark
+                                                .class
+                                                .getCanonicalName()
+                                        + ".*")
+                        .build();
+
+        new Runner(options).run();
+    }
+
+    @Benchmark
+    public void throughput(InputBenchmarkFlinkEnvironmentContext context) 
throws Exception {
+
+        int[] parallelisms = new int[] {1, 2, 3, 4, 5, 6};
+        StreamExecutionEnvironment env = context.env;
+        env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
+        env.disableOperatorChaining();
+        configAdaptivePartitioner(env);
+
+        DataStreamSource<Long> source = sourceType.source(env, 
RECORDS_PER_INVOCATION);
+        if (allParallelismsOfVerticesSame) {
+            source.setParallelism(parallelisms[parallelisms.length - 1]);
+        } else {
+            source.setParallelism(parallelisms[0]);
+        }
+
+        SingleOutputStreamOperator<Long> index = source;
+        for (int i = 1; i < parallelisms.length; i++) {
+            index =
+                    index.map(new MultiplyByTwo())
+                            .setParallelism(
+                                    allParallelismsOfVerticesSame
+                                            ? parallelisms[parallelisms.length 
- 1]
+                                            : parallelisms[i]);
+        }
+
+        index.sinkTo(new 
DiscardingSink<>()).setParallelism(parallelisms[parallelisms.length - 1]);
+
+        env.execute();
+    }
+
+    private void configAdaptivePartitioner(StreamExecutionEnvironment env) {
+        Configuration config = new Configuration();
+        config.set(TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE, 
taskManagerLoadBalanceMode);

Review Comment:
   This will be set after the `MiniCluster` has already been created, and in my 
findings it will be ignored. Moving it to 
`InputBenchmarkFlinkEnvironmentContext` should solve this



##########
src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/resolver/SlotMatchingResolverBenchmarkExecutor.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver;
+
+import 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SimpleSlotMatchingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotMatchingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotsBalancedSlotMatchingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TasksBalancedSlotMatchingResolver;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.RunnerException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** The executor to drive {@link SlotMatchingResolver}. */
+public class SlotMatchingResolverBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+
+    /**
+     * We set the number of slots is very smaller than the number of task 
managers to simulate the
+     * production environment to the greatest extent possible.
+     */
+    public static final int SLOTS_PER_TASKS_MANAGER = 8;
+
+    public static final int TASK_MANAGERS = 128;
+
+    private static final int requestedSlotSharingGroups = 3;
+    private static final List<SlotSharingGroup> slotSharingGroups = new 
ArrayList<>();
+    private static final Collection<ExecutionSlotSharingGroup> requestGroups = 
new ArrayList<>();
+    private static final Collection<PhysicalSlot> slots = new ArrayList<>();
+
+    static {
+        // For ResourceProfile.UNKNOWN.
+        slotSharingGroups.add(new SlotSharingGroup());
+        // For other resource profiles.
+        for (int i = 1; i < requestedSlotSharingGroups; i++) {
+            SlotSharingGroup sharingGroup = new SlotSharingGroup();
+            sharingGroup.setResourceProfile(newGrainfinedResourceProfile(i));
+            slotSharingGroups.add(sharingGroup);
+        }
+        // For requested groups and slots.
+        for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
+
+            TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
+
+            for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; 
slotIndex++) {
+                ResourceProfile profile = 
newGrainfinedResourceProfile(slotIndex);
+
+                slots.add(new TestingSlot(new AllocationID(), profile, tml));
+                requestGroups.add(getExecutionSlotSharingGroup(slotIndex + 1, 
slotIndex));
+            }
+        }
+    }
+
+    private static ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
+            int loading, int slotIndex) {
+        Set<ExecutionVertexID> executionVertexIDSet = new HashSet<>();
+        JobVertexID jobVertexID = new JobVertexID();
+        for (int i = 0; i < loading; i++) {
+            executionVertexIDSet.add(new ExecutionVertexID(jobVertexID, i));
+        }
+        return new ExecutionSlotSharingGroup(
+                slotSharingGroups.get(slotIndex % 3), executionVertexIDSet);
+    }
+
+    public static TaskManagerLocation getTaskManagerLocation(int dataPort) {
+        try {
+            InetAddress inetAddress = InetAddress.getByName("1.2.3.4");
+            return new TaskManagerLocation(ResourceID.generate(), inetAddress, 
dataPort);
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static ResourceProfile newGrainfinedResourceProfile(int slotIndex) {

Review Comment:
   Type: `newfineGrainedResourceProfile`



##########
src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/sharing/strategy/InitSlotSharingStrategyBenchmarkExecutor.java:
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.scheduler.benchmark.scheduling.slot.sharing.strategy;
+
+import 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.scheduler.SlotSharingStrategy;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.runtime.util.JobVertexConnectionUtils;
+import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.RunnerException;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+/**
+ * The executor to run {@link InitSlotSharingStrategyBenchmark} for 
initializing {@link
+ * SlotSharingStrategy}.
+ */
+public class InitSlotSharingStrategyBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+
+    public static final Collection<JobVertex> JOB_VERTICES;
+
+    static JobVertex newJobVertex(int parallelism) {
+        JobVertex jobVertex = new JobVertex("", new JobVertexID());
+        jobVertex.setParallelism(parallelism);
+        jobVertex.setInvokableClass(NoOpInvokable.class);
+        return jobVertex;
+    }
+
+    static {
+        JobVertex jobVertexA = newJobVertex(1000);
+        JobVertex jobVertexB = newJobVertex(4000);
+        JobVertex jobVertexC = newJobVertex(4000);
+        JobVertex jobVertexD = newJobVertex(2000);
+        JobVertex jobVertexE = newJobVertex(3000);
+        JobVertex jobVertexF = newJobVertex(1000);
+
+        JobVertexConnectionUtils.connectNewDataSetAsInput(
+                jobVertexB,
+                jobVertexA,
+                DistributionPattern.ALL_TO_ALL,
+                ResultPartitionType.BLOCKING);
+        JobVertexConnectionUtils.connectNewDataSetAsInput(
+                jobVertexC,
+                jobVertexB,
+                DistributionPattern.POINTWISE,
+                ResultPartitionType.BLOCKING);
+        JobVertexConnectionUtils.connectNewDataSetAsInput(
+                jobVertexD,
+                jobVertexC,
+                DistributionPattern.ALL_TO_ALL,
+                ResultPartitionType.BLOCKING);
+        JobVertexConnectionUtils.connectNewDataSetAsInput(
+                jobVertexE,
+                jobVertexD,
+                DistributionPattern.ALL_TO_ALL,
+                ResultPartitionType.BLOCKING);
+
+        JOB_VERTICES =
+                Arrays.asList(
+                        jobVertexA, jobVertexB, jobVertexC, jobVertexD, 
jobVertexE, jobVertexF);
+    }
+
+    @Param({"NONE", "TASKS"})
+    private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
+
+    private InitSlotSharingStrategyBenchmark benchmark;
+
+    public static void main(String[] args) throws RunnerException {
+        runBenchmark(InitSlotSharingStrategyBenchmark.class);

Review Comment:
   This should pass `InitSlotSharingStrategyBenchmarkExecutor.class`



##########
src/main/java/org/apache/flink/scheduler/benchmark/scheduling/slot/matching/resolver/SlotMatchingResolverBenchmarkExecutor.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.scheduler.benchmark.scheduling.slot.matching.resolver;
+
+import 
org.apache.flink.configuration.TaskManagerOptions.TaskManagerLoadBalanceMode;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlot;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SimpleSlotMatchingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotMatchingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotSharingSlotAllocator.ExecutionSlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.SlotsBalancedSlotMatchingResolver;
+import 
org.apache.flink.runtime.scheduler.adaptive.allocator.TasksBalancedSlotMatchingResolver;
+import org.apache.flink.runtime.scheduler.adaptive.allocator.TestingSlot;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.scheduler.benchmark.SchedulerBenchmarkExecutorBase;
+
+import org.openjdk.jmh.annotations.Benchmark;
+import org.openjdk.jmh.annotations.BenchmarkMode;
+import org.openjdk.jmh.annotations.Level;
+import org.openjdk.jmh.annotations.Mode;
+import org.openjdk.jmh.annotations.Param;
+import org.openjdk.jmh.annotations.Setup;
+import org.openjdk.jmh.infra.Blackhole;
+import org.openjdk.jmh.runner.RunnerException;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/** The executor to drive {@link SlotMatchingResolver}. */
+public class SlotMatchingResolverBenchmarkExecutor extends 
SchedulerBenchmarkExecutorBase {
+
+    /**
+     * We set the number of slots is very smaller than the number of task 
managers to simulate the
+     * production environment to the greatest extent possible.
+     */
+    public static final int SLOTS_PER_TASKS_MANAGER = 8;
+
+    public static final int TASK_MANAGERS = 128;
+
+    private static final int requestedSlotSharingGroups = 3;
+    private static final List<SlotSharingGroup> slotSharingGroups = new 
ArrayList<>();
+    private static final Collection<ExecutionSlotSharingGroup> requestGroups = 
new ArrayList<>();
+    private static final Collection<PhysicalSlot> slots = new ArrayList<>();
+
+    static {
+        // For ResourceProfile.UNKNOWN.
+        slotSharingGroups.add(new SlotSharingGroup());
+        // For other resource profiles.
+        for (int i = 1; i < requestedSlotSharingGroups; i++) {
+            SlotSharingGroup sharingGroup = new SlotSharingGroup();
+            sharingGroup.setResourceProfile(newGrainfinedResourceProfile(i));
+            slotSharingGroups.add(sharingGroup);
+        }
+        // For requested groups and slots.
+        for (int tmIndex = 0; tmIndex < TASK_MANAGERS; tmIndex++) {
+
+            TaskManagerLocation tml = getTaskManagerLocation(tmIndex + 1);
+
+            for (int slotIndex = 0; slotIndex < SLOTS_PER_TASKS_MANAGER; 
slotIndex++) {
+                ResourceProfile profile = 
newGrainfinedResourceProfile(slotIndex);
+
+                slots.add(new TestingSlot(new AllocationID(), profile, tml));
+                requestGroups.add(getExecutionSlotSharingGroup(slotIndex + 1, 
slotIndex));
+            }
+        }
+    }
+
+    private static ExecutionSlotSharingGroup getExecutionSlotSharingGroup(
+            int loading, int slotIndex) {
+        Set<ExecutionVertexID> executionVertexIDSet = new HashSet<>();
+        JobVertexID jobVertexID = new JobVertexID();
+        for (int i = 0; i < loading; i++) {
+            executionVertexIDSet.add(new ExecutionVertexID(jobVertexID, i));
+        }
+        return new ExecutionSlotSharingGroup(
+                slotSharingGroups.get(slotIndex % 3), executionVertexIDSet);
+    }
+
+    public static TaskManagerLocation getTaskManagerLocation(int dataPort) {
+        try {
+            InetAddress inetAddress = InetAddress.getByName("1.2.3.4");
+            return new TaskManagerLocation(ResourceID.generate(), inetAddress, 
dataPort);
+        } catch (UnknownHostException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static ResourceProfile newGrainfinedResourceProfile(int slotIndex) {
+        return ResourceProfile.newBuilder()
+                .setCpuCores(slotIndex % 2 == 0 ? 1 : 2)
+                .setTaskHeapMemoryMB(100)
+                .setTaskOffHeapMemoryMB(100)
+                .setManagedMemoryMB(100)
+                .build();
+    }
+
+    @Param({"NONE", "SLOTS", "TASKS"})
+    private TaskManagerLoadBalanceMode taskManagerLoadBalanceMode;
+
+    private SlotMatchingResolver slotMatchingResolver;
+
+    public static void main(String[] args) throws RunnerException {
+        runBenchmark(SlotMatchingResolverBenchmarkExecutor.class);
+    }
+
+    @Setup(Level.Trial)
+    public void setup() throws Exception {
+        slotMatchingResolver = getSlotMatchingResolver();

Review Comment:
   `slotMatchingResolver` gets assigned twice here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to