1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1453115386


##########
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##########
@@ -446,6 +446,13 @@ public enum JobStoreType {
                     
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
                     .withDescription("The timeout in milliseconds for a idle 
slot in Slot Pool.");
 
+    @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+    public static final ConfigOption<Duration> SLOT_REQUEST_MAX_INTERVAL =
+            key("slot.request.max-interval")
+                    .durationType()
+                    .defaultValue(Duration.ofMillis(50L))

Review Comment:
   The default value is 20ms in the FLIP-370.
   
   [1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -330,12 +334,32 @@ public void onUnknownDeploymentsOf(
                         getMainThreadExecutor(),
                         log);
 
+        TaskManagerLoadBalanceMode mode =
+                TaskManagerLoadBalanceMode.loadFromConfiguration(
+                        jobMasterConfiguration.getConfiguration());
+        JobManagerOptions.SchedulerType schedulerType =
+                slotPoolServiceSchedulerFactory.getSchedulerType();
+        Time slotRequestMaxInterval = null;
+        boolean slotBatchAllocatable = false;
+        if (mode == TASKS && schedulerType == 
JobManagerOptions.SchedulerType.Adaptive) {

Review Comment:
   IIUC, we should enable `slotBatchAllocatable` when mode == TASKS and jobType 
is STREAMING.
   
   
   
![image](https://github.com/apache/flink/assets/38427477/0e2db6c9-60d6-427f-b7cd-98c404217e89)
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/** The default implementation of {@link LoadingWeight} based on task count 
loading. */
+@Internal
+public class DefaultLoadingWeight implements LoadingWeight {
+
+    private float tasks;
+
+    DefaultLoadingWeight(float tasks) {
+        Preconditions.checkArgument(tasks >= 0.0f);
+        this.tasks = tasks;
+    }
+
+    public void incLoading() {
+        this.tasks += 1.0f;
+    }
+
+    @Override
+    public float getLoading() {
+        return tasks;
+    }
+
+    @Override
+    public LoadingWeight merge(LoadingWeight other) {
+        if (other == null) {
+            return LoadingWeight.ofDefaultLoadingWeight(this.tasks);
+        }
+        return LoadingWeight.ofDefaultLoadingWeight(tasks + 
other.getLoading());
+    }
+
+    @Override
+    public int compareTo(@Nonnull LoadingWeight o) {
+        return Float.compare(tasks, o.getLoading());
+    }

Review Comment:
   Why don't move it to interface?
   
   ```
   
       @Override
       default int compareTo(@Nonnull LoadingWeight o) {
           return Float.compare(getLoading(), o.getLoading());
       }
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##########
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
                         .createSlotPoolService(
                                 jid,
                                 createDeclarativeSlotPoolFactory(
-                                        
jobMasterConfiguration.getConfiguration()));
+                                        
jobMasterConfiguration.getConfiguration()),
+                                null,

Review Comment:
   Why don't we introduce the `JobManagerOptions#SLOT_REQUEST_MAX_INTERVAL` in 
the third commit: ` Support resource request wait mechanism at 
DefaultDeclarativeSlotPool side for Default Scheduler`.
   
   You introduce it in the seventh commit `[FLINK-33388] Support tasks 
balancing at TM level for Default Scheduler`. 
   
   But only the third commit uses this option. I checked that the feature of 
seventh commit doesn't use it.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/** The default implementation of {@link LoadingWeight} based on task count 
loading. */
+@Internal
+public class DefaultLoadingWeight implements LoadingWeight {
+
+    private float tasks;
+
+    DefaultLoadingWeight(float tasks) {
+        Preconditions.checkArgument(tasks >= 0.0f);
+        this.tasks = tasks;
+    }
+
+    public void incLoading() {
+        this.tasks += 1.0f;
+    }
+
+    @Override
+    public float getLoading() {
+        return tasks;
+    }
+
+    @Override
+    public LoadingWeight merge(LoadingWeight other) {
+        if (other == null) {
+            return LoadingWeight.ofDefaultLoadingWeight(this.tasks);

Review Comment:
   The merge method should new DefaultLoadingWeight directly.
   
   If  `LoadingWeight.ofDefaultLoadingWeight` is refactored in the future, it 
create other type of `LoadingWeight`. It will create other LoadingWeight when 
`DefaultLoadingWeight` is merged.
   
   Also, if other == null, could we return this directly?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##########
@@ -18,35 +18,50 @@
 
 package org.apache.flink.runtime.scheduler;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.LoadingWeight;
+import org.apache.flink.runtime.scheduler.loading.WeightLoadable;
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 import org.apache.flink.util.Preconditions;
 
+import javax.annotation.Nonnull;
+
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.Set;
 
 /** Represents execution vertices that will run the same shared slot. */
-class ExecutionSlotSharingGroup {
+class ExecutionSlotSharingGroup implements WeightLoadable {
 
     private final Set<ExecutionVertexID> executionVertexIds;
 
-    private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+    @Nonnull private final SlotSharingGroup slotSharingGroup;
+
+    private LoadingWeight loadingWeight;
 
-    ExecutionSlotSharingGroup() {
+    ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) {
+        this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup);
         this.executionVertexIds = new HashSet<>();
+        this.loadingWeight = LoadingWeight.EMPTY;
     }
 
     void addVertex(final ExecutionVertexID executionVertexId) {
         executionVertexIds.add(executionVertexId);
+        ((DefaultLoadingWeight) loadingWeight).incLoading();

Review Comment:
   I guess it `incLoading` will change the `LoadingWeight.EMPTY` and no caller 
calls `setLoading` of `ExecutionSlotSharingGroup`, right?
   
   All slots will share one `DefaultLoadingWeight` object, the loading is 
wrong. And I don't think we can define the `LoadingWeight.EMPTY`, it's very 
dangerous. Every one can change the loading.
   
   Also, during we build the `DefaultLoadingWeight`, we always call 
`incLoading`, why don't new `DefaultLoadingWeight` directly? If anyone set a 
other type loadingWeight, it will cause bug.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/** The default implementation of {@link LoadingWeight} based on task count 
loading. */
+@Internal
+public class DefaultLoadingWeight implements LoadingWeight {
+
+    private float tasks;
+
+    DefaultLoadingWeight(float tasks) {

Review Comment:
   These `LoadingWeight` related classes are changed by 2 commits. I think it's 
unexpected.
   
   They are new classes, if we found the change of old commit isn't good, we 
should amend old commit.
   



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java:
##########
@@ -86,8 +89,15 @@ SchedulerNGFactory getSchedulerNGFactory() {
 
     @Override
     public SlotPoolService createSlotPoolService(
-            JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) {
-        return slotPoolServiceFactory.createSlotPoolService(jid, 
declarativeSlotPoolFactory);
+            JobID jid,
+            DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+            @Nullable Time slotRequestMaxInterval,

Review Comment:
   `Time` has been deprecated for now, please use the Duration instead.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java:
##########
@@ -264,20 +283,33 @@ private static JobManagerOptions.SchedulerType 
getSchedulerType(
 
     @VisibleForTesting
     static RequestSlotMatchingStrategy getRequestSlotMatchingStrategy(
-            Configuration configuration, JobType jobType) {
+            Configuration configuration,
+            JobType jobType,
+            JobManagerOptions.SchedulerType schedulerType) {
         final boolean isLocalRecoveryEnabled =
                 configuration.get(CheckpointingOptions.LOCAL_RECOVERY);
+        TaskManagerLoadBalanceMode mode =
+                
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
 
         if (isLocalRecoveryEnabled) {
             if (jobType == JobType.STREAMING) {
-                return PreferredAllocationRequestSlotMatchingStrategy.INSTANCE;
+                RequestSlotMatchingStrategy rollback =
+                        mode == TaskManagerLoadBalanceMode.TASKS
+                                ? 
TasksBalancedRequestSlotMatchingStrategy.create(
+                                        
SimpleRequestSlotMatchingStrategy.INSTANCE)
+                                : SimpleRequestSlotMatchingStrategy.INSTANCE;
+                return 
PreferredAllocationRequestSlotMatchingStrategy.create(rollback);
             } else {
                 LOG.warn(
                         "Batch jobs do not support local recovery. Falling 
back for request slot matching strategy to {}.",
                         
SimpleRequestSlotMatchingStrategy.class.getSimpleName());
                 return SimpleRequestSlotMatchingStrategy.INSTANCE;
             }
         } else {
+            if (jobType == JobType.STREAMING && mode == 
TaskManagerLoadBalanceMode.TASKS) {
+                return TasksBalancedRequestSlotMatchingStrategy.create(
+                        SimpleRequestSlotMatchingStrategy.INSTANCE);
+            }
             return SimpleRequestSlotMatchingStrategy.INSTANCE;

Review Comment:
   I have 2 questions here:
   
   1. When `isLocalRecoveryEnabled` is true and the parallelism is changed, 
`PreferredAllocationRequestSlotMatchingStrategy` will be as the main strategy, 
`TasksBalancedRequestSlotMatchingStrategy` will as the rollback. It means the 
task balanced cannot be ensured. Is it expected?
   2. Why do using the `TasksBalancedRequestSlotMatchingStrategy` for batch  
job?
   
   cc @KarmaGYZ @zhuzhurk , what's your opinion? It's not mentioned in the FLIP.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java:
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/** The default implementation of {@link LoadingWeight} based on task count 
loading. */
+@Internal
+public class DefaultLoadingWeight implements LoadingWeight {
+
+    private float tasks;

Review Comment:
   ```suggestion
       private float loading;
   ```
   
   The task balanced schduling can use `DefaultLoadingWeight`, and 
DefaultLoadingWeight can be used for other cases. `LoadingWeight` side 
shouldn't care about the tasks. The class comment should be changed as well.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java:
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/**
+ * The interface that holds the {@link LoadingWeight} getter and setter is 
required for
+ * corresponding slot abstractions.
+ */
+@Internal
+public interface WeightLoadable {
+
+    /**
+     * Get the loading weight.
+     *
+     * @return An implementation object of {@link LoadingWeight}.
+     */
+    default LoadingWeight getLoading() {
+        return LoadingWeight.EMPTY;
+    }
+
+    /**
+     * Set the loading weight.
+     *
+     * @param loadingWeight An implementation of {@link LoadingWeight}.
+     */
+    void setLoading(@Nonnull LoadingWeight loadingWeight);

Review Comment:
   Could we initialize the loading in all constructors?We can getLoading and 
merge other LoadingWeight into itself.
   
   During the reviewing, I found the `setLoading` is dangerous.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java:
##########
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** The class is used to represent the loading weight abstraction of slots. */
+@Internal
+public interface LoadingWeight extends Comparable<LoadingWeight>, Serializable 
{
+
+    LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+
+    static LoadingWeight ofDefaultLoadingWeight(float loading) {
+        return new DefaultLoadingWeight(loading);
+    }
+
+    static List<LoadingWeight> ofDefaultLoadingWeights(int... loadings) {

Review Comment:
   ```suggestion
       @VisibleForTesting
       static List<LoadingWeight> ofDefaultLoadingWeights(int... loadings) {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to