1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1453115386
########## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ########## @@ -446,6 +446,13 @@ public enum JobStoreType { .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue()) .withDescription("The timeout in milliseconds for a idle slot in Slot Pool."); + @Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) + public static final ConfigOption<Duration> SLOT_REQUEST_MAX_INTERVAL = + key("slot.request.max-interval") + .durationType() + .defaultValue(Duration.ofMillis(50L)) Review Comment: The default value is 20ms in the FLIP-370. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -330,12 +334,32 @@ public void onUnknownDeploymentsOf( getMainThreadExecutor(), log); + TaskManagerLoadBalanceMode mode = + TaskManagerLoadBalanceMode.loadFromConfiguration( + jobMasterConfiguration.getConfiguration()); + JobManagerOptions.SchedulerType schedulerType = + slotPoolServiceSchedulerFactory.getSchedulerType(); + Time slotRequestMaxInterval = null; + boolean slotBatchAllocatable = false; + if (mode == TASKS && schedulerType == JobManagerOptions.SchedulerType.Adaptive) { Review Comment: IIUC, we should enable `slotBatchAllocatable` when mode == TASKS and jobType is STREAMING. ![image](https://github.com/apache/flink/assets/38427477/0e2db6c9-60d6-427f-b7cd-98c404217e89) ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** The default implementation of {@link LoadingWeight} based on task count loading. */ +@Internal +public class DefaultLoadingWeight implements LoadingWeight { + + private float tasks; + + DefaultLoadingWeight(float tasks) { + Preconditions.checkArgument(tasks >= 0.0f); + this.tasks = tasks; + } + + public void incLoading() { + this.tasks += 1.0f; + } + + @Override + public float getLoading() { + return tasks; + } + + @Override + public LoadingWeight merge(LoadingWeight other) { + if (other == null) { + return LoadingWeight.ofDefaultLoadingWeight(this.tasks); + } + return LoadingWeight.ofDefaultLoadingWeight(tasks + other.getLoading()); + } + + @Override + public int compareTo(@Nonnull LoadingWeight o) { + return Float.compare(tasks, o.getLoading()); + } Review Comment: Why don't move it to interface? ``` @Override default int compareTo(@Nonnull LoadingWeight o) { return Float.compare(getLoading(), o.getLoading()); } ``` ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ########## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), + null, Review Comment: Why don't we introduce the `JobManagerOptions#SLOT_REQUEST_MAX_INTERVAL` in the third commit: ` Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler`. You introduce it in the seventh commit `[FLINK-33388] Support tasks balancing at TM level for Default Scheduler`. But only the third commit uses this option. I checked that the feature of seventh commit doesn't use it. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** The default implementation of {@link LoadingWeight} based on task count loading. */ +@Internal +public class DefaultLoadingWeight implements LoadingWeight { + + private float tasks; + + DefaultLoadingWeight(float tasks) { + Preconditions.checkArgument(tasks >= 0.0f); + this.tasks = tasks; + } + + public void incLoading() { + this.tasks += 1.0f; + } + + @Override + public float getLoading() { + return tasks; + } + + @Override + public LoadingWeight merge(LoadingWeight other) { + if (other == null) { + return LoadingWeight.ofDefaultLoadingWeight(this.tasks); Review Comment: The merge method should new DefaultLoadingWeight directly. If `LoadingWeight.ofDefaultLoadingWeight` is refactored in the future, it create other type of `LoadingWeight`. It will create other LoadingWeight when `DefaultLoadingWeight` is merged. Also, if other == null, could we return this directly? ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ########## @@ -18,35 +18,50 @@ package org.apache.flink.runtime.scheduler; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.loading.DefaultLoadingWeight; +import org.apache.flink.runtime.scheduler.loading.LoadingWeight; +import org.apache.flink.runtime.scheduler.loading.WeightLoadable; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; import org.apache.flink.util.Preconditions; +import javax.annotation.Nonnull; + import java.util.Collections; import java.util.HashSet; import java.util.Set; /** Represents execution vertices that will run the same shared slot. */ -class ExecutionSlotSharingGroup { +class ExecutionSlotSharingGroup implements WeightLoadable { private final Set<ExecutionVertexID> executionVertexIds; - private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; + @Nonnull private final SlotSharingGroup slotSharingGroup; + + private LoadingWeight loadingWeight; - ExecutionSlotSharingGroup() { + ExecutionSlotSharingGroup(@Nonnull SlotSharingGroup slotSharingGroup) { + this.slotSharingGroup = Preconditions.checkNotNull(slotSharingGroup); this.executionVertexIds = new HashSet<>(); + this.loadingWeight = LoadingWeight.EMPTY; } void addVertex(final ExecutionVertexID executionVertexId) { executionVertexIds.add(executionVertexId); + ((DefaultLoadingWeight) loadingWeight).incLoading(); Review Comment: I guess it `incLoading` will change the `LoadingWeight.EMPTY` and no caller calls `setLoading` of `ExecutionSlotSharingGroup`, right? All slots will share one `DefaultLoadingWeight` object, the loading is wrong. And I don't think we can define the `LoadingWeight.EMPTY`, it's very dangerous. Every one can change the loading. Also, during we build the `DefaultLoadingWeight`, we always call `incLoading`, why don't new `DefaultLoadingWeight` directly? If anyone set a other type loadingWeight, it will cause bug. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** The default implementation of {@link LoadingWeight} based on task count loading. */ +@Internal +public class DefaultLoadingWeight implements LoadingWeight { + + private float tasks; + + DefaultLoadingWeight(float tasks) { Review Comment: These `LoadingWeight` related classes are changed by 2 commits. I think it's unexpected. They are new classes, if we found the change of old commit isn't good, we should amend old commit. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java: ########## @@ -86,8 +89,15 @@ SchedulerNGFactory getSchedulerNGFactory() { @Override public SlotPoolService createSlotPoolService( - JobID jid, DeclarativeSlotPoolFactory declarativeSlotPoolFactory) { - return slotPoolServiceFactory.createSlotPoolService(jid, declarativeSlotPoolFactory); + JobID jid, + DeclarativeSlotPoolFactory declarativeSlotPoolFactory, + @Nullable Time slotRequestMaxInterval, Review Comment: `Time` has been deprecated for now, please use the Duration instead. ########## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/DefaultSlotPoolServiceSchedulerFactory.java: ########## @@ -264,20 +283,33 @@ private static JobManagerOptions.SchedulerType getSchedulerType( @VisibleForTesting static RequestSlotMatchingStrategy getRequestSlotMatchingStrategy( - Configuration configuration, JobType jobType) { + Configuration configuration, + JobType jobType, + JobManagerOptions.SchedulerType schedulerType) { final boolean isLocalRecoveryEnabled = configuration.get(CheckpointingOptions.LOCAL_RECOVERY); + TaskManagerLoadBalanceMode mode = + TaskManagerLoadBalanceMode.loadFromConfiguration(configuration); if (isLocalRecoveryEnabled) { if (jobType == JobType.STREAMING) { - return PreferredAllocationRequestSlotMatchingStrategy.INSTANCE; + RequestSlotMatchingStrategy rollback = + mode == TaskManagerLoadBalanceMode.TASKS + ? TasksBalancedRequestSlotMatchingStrategy.create( + SimpleRequestSlotMatchingStrategy.INSTANCE) + : SimpleRequestSlotMatchingStrategy.INSTANCE; + return PreferredAllocationRequestSlotMatchingStrategy.create(rollback); } else { LOG.warn( "Batch jobs do not support local recovery. Falling back for request slot matching strategy to {}.", SimpleRequestSlotMatchingStrategy.class.getSimpleName()); return SimpleRequestSlotMatchingStrategy.INSTANCE; } } else { + if (jobType == JobType.STREAMING && mode == TaskManagerLoadBalanceMode.TASKS) { + return TasksBalancedRequestSlotMatchingStrategy.create( + SimpleRequestSlotMatchingStrategy.INSTANCE); + } return SimpleRequestSlotMatchingStrategy.INSTANCE; Review Comment: I have 2 questions here: 1. When `isLocalRecoveryEnabled` is true and the parallelism is changed, `PreferredAllocationRequestSlotMatchingStrategy` will be as the main strategy, `TasksBalancedRequestSlotMatchingStrategy` will as the rollback. It means the task balanced cannot be ensured. Is it expected? 2. Why do using the `TasksBalancedRequestSlotMatchingStrategy` for batch job? cc @KarmaGYZ @zhuzhurk , what's your opinion? It's not mentioned in the FLIP. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java: ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** The default implementation of {@link LoadingWeight} based on task count loading. */ +@Internal +public class DefaultLoadingWeight implements LoadingWeight { + + private float tasks; Review Comment: ```suggestion private float loading; ``` The task balanced schduling can use `DefaultLoadingWeight`, and DefaultLoadingWeight can be used for other cases. `LoadingWeight` side shouldn't care about the tasks. The class comment should be changed as well. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java: ########## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nonnull; + +/** + * The interface that holds the {@link LoadingWeight} getter and setter is required for + * corresponding slot abstractions. + */ +@Internal +public interface WeightLoadable { + + /** + * Get the loading weight. + * + * @return An implementation object of {@link LoadingWeight}. + */ + default LoadingWeight getLoading() { + return LoadingWeight.EMPTY; + } + + /** + * Set the loading weight. + * + * @param loadingWeight An implementation of {@link LoadingWeight}. + */ + void setLoading(@Nonnull LoadingWeight loadingWeight); Review Comment: Could we initialize the loading in all constructors?We can getLoading and merge other LoadingWeight into itself. During the reviewing, I found the `setLoading` is dangerous. ########## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java: ########## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** The class is used to represent the loading weight abstraction of slots. */ +@Internal +public interface LoadingWeight extends Comparable<LoadingWeight>, Serializable { + + LoadingWeight EMPTY = new DefaultLoadingWeight(0f); + + static LoadingWeight ofDefaultLoadingWeight(float loading) { + return new DefaultLoadingWeight(loading); + } + + static List<LoadingWeight> ofDefaultLoadingWeights(int... loadings) { Review Comment: ```suggestion @VisibleForTesting static List<LoadingWeight> ofDefaultLoadingWeights(int... loadings) { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org