Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui merged PR #23635: URL: https://github.com/apache/flink/pull/23635 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1691113488 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java: ## @@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest { private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; -private Set slotSharingGroups; - -@BeforeEach -void setUp() { -topology = new TestingSchedulingTopology(); - +private void setupCase() { ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); -final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); -slotSharingGroups = Collections.singleton(slotSharingGroup); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); } -@Test -void testCoLocationConstraintIsRespected() { -topology.connect(ev11, ev22); -topology.connect(ev12, ev21); - -final CoLocationGroup coLocationGroup = -new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); -final Set coLocationGroups = Collections.singleton(coLocationGroup); - -final SlotSharingStrategy strategy = -new LocalInputPreferredSlotSharingStrategy( -topology, slotSharingGroups, coLocationGroups); - -assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) -.contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) -.contains(ev12.getId(), ev22.getId()); +@Override +protected SlotSharingStrategy getSlotSharingStrategy( +SchedulingTopology topology, +Set slotSharingGroups, +Set coLocationGroups) { +return new LocalInputPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { +setupCase(); Review Comment: Thank @1996fanrui you very much for the review , I updated related parts based on your comments. PTAL if you had the free time. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1687687114 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategyTest.java: ## @@ -69,46 +69,29 @@ class LocalInputPreferredSlotSharingStrategyTest { private TestingSchedulingExecutionVertex ev21; private TestingSchedulingExecutionVertex ev22; -private Set slotSharingGroups; - -@BeforeEach -void setUp() { -topology = new TestingSchedulingTopology(); - +private void setupCase() { ev11 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 0); ev12 = topology.newExecutionVertex(JOB_VERTEX_ID_1, 1); ev21 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 0); ev22 = topology.newExecutionVertex(JOB_VERTEX_ID_2, 1); -final SlotSharingGroup slotSharingGroup = new SlotSharingGroup(); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); -slotSharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); -slotSharingGroups = Collections.singleton(slotSharingGroup); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_1); +slotsharingGroup.addVertexToGroup(JOB_VERTEX_ID_2); } -@Test -void testCoLocationConstraintIsRespected() { -topology.connect(ev11, ev22); -topology.connect(ev12, ev21); - -final CoLocationGroup coLocationGroup = -new TestingCoLocationGroup(JOB_VERTEX_ID_1, JOB_VERTEX_ID_2); -final Set coLocationGroups = Collections.singleton(coLocationGroup); - -final SlotSharingStrategy strategy = -new LocalInputPreferredSlotSharingStrategy( -topology, slotSharingGroups, coLocationGroups); - -assertThat(strategy.getExecutionSlotSharingGroups()).hasSize(2); - assertThat(strategy.getExecutionSlotSharingGroup(ev11.getId()).getExecutionVertexIds()) -.contains(ev11.getId(), ev21.getId()); - assertThat(strategy.getExecutionSlotSharingGroup(ev12.getId()).getExecutionVertexIds()) -.contains(ev12.getId(), ev22.getId()); +@Override +protected SlotSharingStrategy getSlotSharingStrategy( +SchedulingTopology topology, +Set slotSharingGroups, +Set coLocationGroups) { +return new LocalInputPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); } @Test void testInputLocalityIsRespectedWithRescaleEdge() { +setupCase(); Review Comment: It's a little wired to call `setupCase` in too many tests. Could we define a `protected abstract void doSetup();` in `AbstractSlotSharingStrategyTest` and `setUp` call `doSetup()`? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * con
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-2151329162 > 👋 Hi, are there any updates or progress on this work as part of FLIP-370? thx for your attention. It's still in working. will update in the next few days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
klam-shop commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-2150824717 👋 Hi, are there any updates or progress on this work as part of FLIP-370? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1455029438 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** The class is used to represent the loading weight abstraction of slots. */ +@Internal +public interface LoadingWeight extends Comparable, Serializable { + +LoadingWeight EMPTY = new DefaultLoadingWeight(0f); + +static LoadingWeight ofDefaultLoadingWeight(float loading) { +return new DefaultLoadingWeight(loading); +} + +static List ofDefaultLoadingWeights(int... loadings) { +List loadingWeights = new ArrayList<>(loadings.length); +for (int loading : loadings) { +loadingWeights.add(ofDefaultLoadingWeight(loading)); +} +return loadingWeights; +} + +static List supplyEmptyLoadWeights(int number) { +Preconditions.checkArgument(number >= 0); +LoadingWeight[] loadingWeights = new LoadingWeight[number]; +Arrays.parallelSetAll(loadingWeights, value -> EMPTY); +return Arrays.stream(loadingWeights).collect(Collectors.toList()); +} + +/** + * Get the loading value. + * + * @return A float represented the loading. + */ +float getLoading(); + +/** + * Merge the other loading weight into itself. Review Comment: For example: this comment https://github.com/apache/flink/pull/23635#discussion_r1453243566 . If we allow reset the `LoadingWeight`, it's easy to introduce some bugs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1455025025 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** The class is used to represent the loading weight abstraction of slots. */ +@Internal +public interface LoadingWeight extends Comparable, Serializable { + +LoadingWeight EMPTY = new DefaultLoadingWeight(0f); + +static LoadingWeight ofDefaultLoadingWeight(float loading) { +return new DefaultLoadingWeight(loading); +} + +static List ofDefaultLoadingWeights(int... loadings) { +List loadingWeights = new ArrayList<>(loadings.length); +for (int loading : loadings) { +loadingWeights.add(ofDefaultLoadingWeight(loading)); +} +return loadingWeights; +} + +static List supplyEmptyLoadWeights(int number) { +Preconditions.checkArgument(number >= 0); +LoadingWeight[] loadingWeights = new LoadingWeight[number]; +Arrays.parallelSetAll(loadingWeights, value -> EMPTY); +return Arrays.stream(loadingWeights).collect(Collectors.toList()); +} + +/** + * Get the loading value. + * + * @return A float represented the loading. + */ +float getLoading(); + +/** + * Merge the other loading weight into itself. Review Comment: Immutable object is safe for itself. For example, create a new `LoadingWeight` means `LoadingWeight` is very safe. But callers are hard to maintained. For example: `WeightLoadable#setLoading` can reset the `LoadingWeight` for each slot. It's not safe for these slot related class. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1455014315 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** The class is used to represent the loading weight abstraction of slots. */ +@Internal +public interface LoadingWeight extends Comparable, Serializable { + +LoadingWeight EMPTY = new DefaultLoadingWeight(0f); + +static LoadingWeight ofDefaultLoadingWeight(float loading) { +return new DefaultLoadingWeight(loading); +} + +static List ofDefaultLoadingWeights(int... loadings) { +List loadingWeights = new ArrayList<>(loadings.length); +for (int loading : loadings) { +loadingWeights.add(ofDefaultLoadingWeight(loading)); +} +return loadingWeights; +} + +static List supplyEmptyLoadWeights(int number) { +Preconditions.checkArgument(number >= 0); +LoadingWeight[] loadingWeights = new LoadingWeight[number]; +Arrays.parallelSetAll(loadingWeights, value -> EMPTY); +return Arrays.stream(loadingWeights).collect(Collectors.toList()); +} + +/** + * Get the loading value. + * + * @return A float represented the loading. + */ +float getLoading(); + +/** + * Merge the other loading weight into itself. Review Comment: Could we consider making this implementation immutable? This means that every time an object's internal value changes, a new object will be created. An immutable object implies high security. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1454932673 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; + +/** The class is used to represent the loading weight abstraction of slots. */ +@Internal +public interface LoadingWeight extends Comparable, Serializable { + +LoadingWeight EMPTY = new DefaultLoadingWeight(0f); + +static LoadingWeight ofDefaultLoadingWeight(float loading) { +return new DefaultLoadingWeight(loading); +} + +static List ofDefaultLoadingWeights(int... loadings) { +List loadingWeights = new ArrayList<>(loadings.length); +for (int loading : loadings) { +loadingWeights.add(ofDefaultLoadingWeight(loading)); +} +return loadingWeights; +} + +static List supplyEmptyLoadWeights(int number) { +Preconditions.checkArgument(number >= 0); +LoadingWeight[] loadingWeights = new LoadingWeight[number]; +Arrays.parallelSetAll(loadingWeights, value -> EMPTY); +return Arrays.stream(loadingWeights).collect(Collectors.toList()); +} + +/** + * Get the loading value. + * + * @return A float represented the loading. + */ +float getLoading(); + +/** + * Merge the other loading weight into itself. Review Comment: From the semantic, it will merge other into itself. But I see the implementation of `DefaultLoadingWeight`, It doesn't update itself. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1454413854 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java: ## @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; + +import javax.annotation.Nonnull; + +/** + * The interface that holds the {@link LoadingWeight} getter and setter is required for + * corresponding slot abstractions. + */ +@Internal +public interface WeightLoadable { + +/** + * Get the loading weight. + * + * @return An implementation object of {@link LoadingWeight}. + */ +default LoadingWeight getLoading() { +return LoadingWeight.EMPTY; +} + +/** + * Set the loading weight. + * + * @param loadingWeight An implementation of {@link LoadingWeight}. + */ +void setLoading(@Nonnull LoadingWeight loadingWeight); Review Comment: Or `ExecutionSlotSharingGroup` should not `implements WeightLoadable`. It only initialize and return loading. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1453256769 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -330,12 +334,32 @@ public void onUnknownDeploymentsOf( getMainThreadExecutor(), log); +TaskManagerLoadBalanceMode mode = +TaskManagerLoadBalanceMode.loadFromConfiguration( +jobMasterConfiguration.getConfiguration()); +JobManagerOptions.SchedulerType schedulerType = +slotPoolServiceSchedulerFactory.getSchedulerType(); +Time slotRequestMaxInterval = null; +boolean slotBatchAllocatable = false; +if (mode == TASKS && schedulerType == JobManagerOptions.SchedulerType.Adaptive) { Review Comment: The `Adaptive` should be `Default` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1453115386 ## flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java: ## @@ -446,6 +446,13 @@ public enum JobStoreType { .defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue()) .withDescription("The timeout in milliseconds for a idle slot in Slot Pool."); +@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING) +public static final ConfigOption SLOT_REQUEST_MAX_INTERVAL = +key("slot.request.max-interval") +.durationType() +.defaultValue(Duration.ofMillis(50L)) Review Comment: The default value is 20ms in the FLIP-370. [1] https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -330,12 +334,32 @@ public void onUnknownDeploymentsOf( getMainThreadExecutor(), log); +TaskManagerLoadBalanceMode mode = +TaskManagerLoadBalanceMode.loadFromConfiguration( +jobMasterConfiguration.getConfiguration()); +JobManagerOptions.SchedulerType schedulerType = +slotPoolServiceSchedulerFactory.getSchedulerType(); +Time slotRequestMaxInterval = null; +boolean slotBatchAllocatable = false; +if (mode == TASKS && schedulerType == JobManagerOptions.SchedulerType.Adaptive) { Review Comment: IIUC, we should enable `slotBatchAllocatable` when mode == TASKS and jobType is STREAMING. ![image](https://github.com/apache/flink/assets/38427477/0e2db6c9-60d6-427f-b7cd-98c404217e89) ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java: ## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.loading; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.util.Preconditions; + +import javax.annotation.Nonnull; + +import java.util.Objects; + +/** The default implementation of {@link LoadingWeight} based on task count loading. */ +@Internal +public class DefaultLoadingWeight implements LoadingWeight { + +private float tasks; + +DefaultLoadingWeight(float tasks) { +Preconditions.checkArgument(tasks >= 0.0f); +this.tasks = tasks; +} + +public void incLoading() { +this.tasks += 1.0f; +} + +@Override +public float getLoading() { +return tasks; +} + +@Override +public LoadingWeight merge(LoadingWeight other) { +if (other == null) { +return LoadingWeight.ofDefaultLoadingWeight(this.tasks); +} +return LoadingWeight.ofDefaultLoadingWeight(tasks + other.getLoading()); +} + +@Override +public int compareTo(@Nonnull LoadingWeight o) { +return Float.compare(tasks, o.getLoading()); +} Review Comment: Why don't move it to interface? ``` @Override default int compareTo(@Nonnull LoadingWeight o) { return Float.compare(getLoading(), o.getLoading()); } ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: Why don't we introduce the `JobManagerOptions#SLOT_REQUEST_MAX_INTERVAL` in the third commit: ` Support resource request wait mechanism at DefaultDeclarativeSlotPool side for Default Scheduler`. You introduce it in the seventh commit `[FLINK-33388] Support tasks balancing at TM level for Default
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1892964860 > @RocMarshal Just be curious about the progress, does this PR still wait for some comments to be addressed before it could be merged? This PR is in progress now. We plan to merge it after the complete Task Balancing feature is implemented. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
Myasuka commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1892504610 @RocMarshal Just be curious about the progress, does this PR still wait for some comments to be addressed before it could be merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocXing commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1436413745 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,344 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionSlotSharingGroups( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); +} +} + +/** The interface to compute the fittest slot index. */ +private interface SlotIndexSupplier { + +int getFittestSlotIndex( +final SlotSharingGroup slotSharingGroup, +@Nullable final SchedulingExecutionVertex executionVertex); +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ +private final Map> +paralleledExecutionSlotSharingGroupsMap; + +/** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ +private final Map slotSharingGroupIndexMap; + +private final Map +executionSlotSharingGroupMap; + +private final Map coLocationGroupMap; + +private final Map +constraintToExecutionSlotSharingGroupMap; + +private TaskBalancedExecutionSlotSharingGroupBuilder( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +this.topology = checkNotNull(topology); + +
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432535390 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: > you plan to modify this parameter in the commit introducing the configuration? Yes. Thank you for your confirmation~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432535390 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: > you plan to modify this parameter in the commit introducing the configuration? Yes. Thank you confirmation~ ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: > you plan to modify this parameter in the commit introducing the configuration? Yes. Thank you for your confirmation~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432535390 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: Got it, Thank you confirmation~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432482972 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: I lean to this version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432481088 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: IIUC, you plan to modify this parameter in the commit introducing the configuration? Sounds good from my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: @KarmaGYZ Thanks a lot for the comments. Please let me have a try on explaining it. My initial intention was to reduce code redundancy, as there are currently two calling functions, and the only difference between them is whether to reduce resource requests and increase resource requests. If the extraction of the common part is too broad, then I am very willing to improve it. for example: ``` @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { if (increment.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.add(increment); doDeclareResourceRequirements(); } @Override public void decreaseResourceRequirementsBy(ResourceCounter decrement) { if (decrement.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.subtract(decrement); doDeclareResourceRequirements(); } private void doDeclareResourceRequirements() { if (slotRequestMaxInterval == null) { declareResourceRequirements(); return; } if (slotRequestMaxIntervalTimeoutFuture != null && !slotRequestMaxIntervalTimeoutFuture.isDone() && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) { slotRequestMaxIntervalTimeoutFuture.cancel(true); } slotRequestMaxIntervalTimeoutFuture = componentMainThreadExecutor.schedule( this::declareResourceRequirements, slotRequestMaxInterval.toMilliseconds(), TimeUnit.MILLISECONDS); } ``` please let me know what's your opinion~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432351722 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: hi, @KarmaGYZ Thank you very much for your comment. Did you mean that we didn't decide to pass the values of `slotRequestMaxInterval` `slotBatchAllocatable` here based on the configuration, but instead directly used hard coding to pass the values ? If so (IIUC), The reason for not doing so: - We want to change the parameter transfer logic uniformly when the default scheduler fully supports balanced scheduling. - The current hard coded default values will not break the original logical semantics Please correct me if i'm wrong. Any suggestion is appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432351722 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: hi, @KarmaGYZ Thank you very much for your comment. Did you mean that we didn't decide to pass the values of `slotRequestMaxInterval` `slotBatchAllocatable` here based on the configuration, but instead directly used hard coding to pass the values ? If so (IIUC), The reason for not doing so: - We want to change the parameter transfer logic uniformly when the default scheduler fully supports balanced scheduling. -The current hard coded default values will not break the original logical semantics Please correct me if i'm wrong. Any suggestion is appreciated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: @KarmaGYZ Thanks a lot for the comments. Please let me have a try on explaining it. My initial intention was to reduce code redundancy, as there are currently two calling functions, and the only difference between them is whether to reduce resource requests and increase resource requests. If the extraction of the common part is too broad, then I am very willing to improve it. for example: ``` @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { if (increment.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.add(increment); updateResourceRequirementsBy(); } @Override public void decreaseResourceRequirementsBy(ResourceCounter decrement) { if (decrement.isEmpty()) { return; } totalResourceRequirements = totalResourceRequirements.subtract(decrement); updateResourceRequirementsBy(); } private void updateResourceRequirementsBy() { if (slotRequestMaxInterval == null) { declareResourceRequirements(); return; } if (slotRequestMaxIntervalTimeoutFuture != null && !slotRequestMaxIntervalTimeoutFuture.isDone() && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) { slotRequestMaxIntervalTimeoutFuture.cancel(true); } slotRequestMaxIntervalTimeoutFuture = componentMainThreadExecutor.schedule( this::declareResourceRequirements, slotRequestMaxInterval.toMilliseconds(), TimeUnit.MILLISECONDS); } ``` please let me know what's your opinion~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1432179061 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf( .createSlotPoolService( jid, createDeclarativeSlotPoolFactory( - jobMasterConfiguration.getConfiguration())); + jobMasterConfiguration.getConfiguration()), +null, Review Comment: Why not set it according to the configuration? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); +} + +private void updateResourceRequirementsBy( +@Nonnull ResourceCounter deltaResourceCount, @Nonnull Runnable runnable) { +if (deltaResourceCount.isEmpty()) { return; } -totalResourceRequirements = totalResourceRequirements.add(increment); -declareResourceRequirements(); -} +runnable.run(); -@Override -public void decreaseResourceRequirementsBy(ResourceCounter decrement) { -if (decrement.isEmpty()) { +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); return; } -totalResourceRequirements = totalResourceRequirements.subtract(decrement); -declareResourceRequirements(); +if (slotRequestMaxIntervalTimeoutFuture != null +&& !slotRequestMaxIntervalTimeoutFuture.isDone() +&& !slotRequestMaxIntervalTimeoutFuture.isCancelled()) { +slotRequestMaxIntervalTimeoutFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutFuture = +componentMainThreadExecutor.schedule( +this::declareResourceRequirements, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +@Override +public void decreaseResourceRequirementsBy(ResourceCounter decrement) { +updateResourceRequirementsBy( +decrement, +() -> totalResourceRequirements = totalResourceRequirements.subtract(decrement)); Review Comment: ditto ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java: ## @@ -336,7 +336,8 @@ public void onUnknownDeploymentsOf( createDeclarativeSlotPoolFactory( jobMasterConfiguration.getConfiguration()), null, -getMainThreadExecutor()); +getMainThreadExecutor(), +false); Review Comment: Why not set slotBatchAllocatable according to the configuration? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool( this.totalResourceRequirements = ResourceCounter.empty(); this.fulfilledResourceRequirements = ResourceCounter.empty(); this.slotToRequirementProfileMappings = new HashMap<>(); +this.componentMainThreadExecutor = Preconditions.checkNotNull(componentMainThreadExecutor); +this.slotRequestMaxInterval = slotRequestMaxInterval; } @Override public void increaseResourceRequirementsBy(ResourceCounter increment) { -if (increment.isEmpty()) { +updateResourceRequirementsBy( +increment, +() -> totalResourceRequirements = totalResourceRequirements.add(increment)); Review Comment: Why not check whether the ResourceCounter is empty and modify the totalResourceRequirements right here? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, p
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1427717395 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ## @@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequireme @VisibleForTesting void newSlotsAreAvailable(Collection newSlots) { +if (!globalSlotsViewable) { +final Collection requestSlotMatches = +requestSlotMatchingStrategy.matchRequestsAndSlots( +newSlots, pendingRequests.values()); +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +return; +} + +receivedSlots.addAll(newSlots); +if (receivedSlots.size() < pendingRequests.size()) { +return; +} final Collection requestSlotMatches = requestSlotMatchingStrategy.matchRequestsAndSlots( newSlots, pendingRequests.values()); +if (requestSlotMatches.size() == pendingRequests.size()) { +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +} +receivedSlots.clear(); Review Comment: IIUC, Batched resource requests are sent to RM, and generally, all slots on each TM are provided to the `DeclarativeSlotPoolBridge` in a batch, with each call triggered `newAvailableSlots` once. So we need to cache it here, right ? I'd appreciate it with your opinion or confirmation . -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1427526869 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -103,12 +106,35 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); +// For batch slots requests +@Nullable private final Time slotRequestMaxInterval; +@Nullable private final ComponentMainThreadExecutor componentMainThreadExecutor; Review Comment: If we introduce `ComponentMainThreadExecutor`, I prefer it's `@Nonnull` even if `slotRequestMaxInterval` is null. This thread executor may be used for other scenarios, it would be better not bound to`slotRequestMaxInterval`. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -103,12 +106,35 @@ public class DefaultDeclarativeSlotPool implements DeclarativeSlotPool { private final RequirementMatcher requirementMatcher = new DefaultRequirementMatcher(); +// For batch slots requests +@Nullable private final Time slotRequestMaxInterval; +@Nullable private final ComponentMainThreadExecutor componentMainThreadExecutor; +@Nullable private ScheduledFuture slotRequestMaxIntervalTimeoutFuture; + public DefaultDeclarativeSlotPool( JobID jobId, AllocatedSlotPool slotPool, Consumer> notifyNewResourceRequirements, Time idleSlotTimeout, Time rpcTimeout) { +this( +jobId, +slotPool, +notifyNewResourceRequirements, +idleSlotTimeout, +rpcTimeout, +null, +null); +} + +public DefaultDeclarativeSlotPool( +JobID jobId, +AllocatedSlotPool slotPool, +Consumer> notifyNewResourceRequirements, +Time idleSlotTimeout, +Time rpcTimeout, +@Nullable Time slotRequestMaxInterval, +@Nullable ComponentMainThreadExecutor componentMainThreadExecutor) { Review Comment: They have been marked to `@Nullable`, do we still need 2 constructors? May be one constructor with full parameters is enough. Note: `BlocklistDeclarativeSlotPool` is same. Also, this commit is a part of FLINK-33388, right? Why doesn't it introduce `slotRequestMaxInterval` in this commit? I didn't see any caller pass `slotRequestMaxInterval` in this PR. `commit message` is better to adding the corresponding JIRA id. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1427515808 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutCheckFuture = +componentMainThreadExecutor.schedule( +this::checkSlotRequestMaxIntervalTimeout, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +private void checkSlotRequestMaxIntervalTimeout() { +if (componentMainThreadExecutor == null || slotRequestMaxInterval == null) { Review Comment: `Precondition.checkNotNull` on two variables would be good enough from my side. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1427515551 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); Review Comment: In current approach, we will minimize the rpc calls. The other approach may benefit the e2e latency. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1427513961 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ## @@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequireme @VisibleForTesting void newSlotsAreAvailable(Collection newSlots) { +if (!globalSlotsViewable) { +final Collection requestSlotMatches = +requestSlotMatchingStrategy.matchRequestsAndSlots( +newSlots, pendingRequests.values()); +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +return; +} + +receivedSlots.addAll(newSlots); +if (receivedSlots.size() < pendingRequests.size()) { +return; +} final Collection requestSlotMatches = requestSlotMatchingStrategy.matchRequestsAndSlots( newSlots, pendingRequests.values()); +if (requestSlotMatches.size() == pendingRequests.size()) { +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +} +receivedSlots.clear(); Review Comment: The `receivedSlots` will be cleared no matter whether we got enough matching slots. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426543430 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); Review Comment: Thank you very much for the comments. Both are acceptable to me. But I can't make a final good estimate on its in my limited read. Would you mind explaining the advantages of another approach? Thank you very much~ CC @1996fanrui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426543430 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); Review Comment: Thank you very much for the comments. Both are acceptable to me. But I can't make a good evaluation on its in my limited read. Would you mind explaining the advantages of another approach? Thank you very much~ CC @1996fanrui -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426524511 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ## @@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequireme @VisibleForTesting void newSlotsAreAvailable(Collection newSlots) { +if (!globalSlotsViewable) { +final Collection requestSlotMatches = +requestSlotMatchingStrategy.matchRequestsAndSlots( +newSlots, pendingRequests.values()); +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +return; +} + +receivedSlots.addAll(newSlots); +if (receivedSlots.size() < pendingRequests.size()) { +return; +} final Collection requestSlotMatches = requestSlotMatchingStrategy.matchRequestsAndSlots( newSlots, pendingRequests.values()); +if (requestSlotMatches.size() == pendingRequests.size()) { Review Comment: This is caused by the code version. I will update the latest commitments later~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426523374 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ## @@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequireme @VisibleForTesting void newSlotsAreAvailable(Collection newSlots) { +if (!globalSlotsViewable) { +final Collection requestSlotMatches = +requestSlotMatchingStrategy.matchRequestsAndSlots( +newSlots, pendingRequests.values()); +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +return; +} + +receivedSlots.addAll(newSlots); +if (receivedSlots.size() < pendingRequests.size()) { +return; +} final Collection requestSlotMatches = requestSlotMatchingStrategy.matchRequestsAndSlots( newSlots, pendingRequests.values()); +if (requestSlotMatches.size() == pendingRequests.size()) { +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +} +receivedSlots.clear(); Review Comment: Thanks @KarmaGYZ for the comments. I'm sorry, I didn't quite understand your focus. Do you mean that we should clean up after matching, or should we not start cleaning up after matching? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426516597 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutCheckFuture = +componentMainThreadExecutor.schedule( +this::checkSlotRequestMaxIntervalTimeout, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +private void checkSlotRequestMaxIntervalTimeout() { +if (componentMainThreadExecutor == null || slotRequestMaxInterval == null) { Review Comment: nice idea! How about `When assigning values in the construction method, we do some checks and only allow these two parameters to be both empty or not empty at the same time` ? Or use `Precondition.checkState/Args` to check it ? Because we only need this `componentMainThreadExecutor ` when allowing requests resources by batch. Looking forward to better handlings. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426510597 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutCheckFuture = +componentMainThreadExecutor.schedule( +this::checkSlotRequestMaxIntervalTimeout, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +private void checkSlotRequestMaxIntervalTimeout() { +if (componentMainThreadExecutor == null || slotRequestMaxInterval == null) { +return; +} declareResourceRequirements(); +slotRequestMaxIntervalTimeoutCheckFuture = null; Review Comment: Yes, here is an illegal assignment with risk. IIUC, we only need to close the current `future` that's cancelable when increasing requirements. Otherwise, we can use the result of scheduled task to make assignment, please correct me if i'm wrong. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1413318476 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -110,7 +110,7 @@ public Optional castInto(Class clazz) { } @Override -public final void start( Review Comment: There is no need to do this. Just override the onStart. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -142,7 +142,7 @@ protected void assertHasBeenStarted() { } @Override -public final void close() { Review Comment: ditto. Override the onClose -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1426214331 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java: ## @@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter previouslyFulfilledRequireme @VisibleForTesting void newSlotsAreAvailable(Collection newSlots) { +if (!globalSlotsViewable) { +final Collection requestSlotMatches = +requestSlotMatchingStrategy.matchRequestsAndSlots( +newSlots, pendingRequests.values()); +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +return; +} + +receivedSlots.addAll(newSlots); +if (receivedSlots.size() < pendingRequests.size()) { +return; +} final Collection requestSlotMatches = requestSlotMatchingStrategy.matchRequestsAndSlots( newSlots, pendingRequests.values()); +if (requestSlotMatches.size() == pendingRequests.size()) { +reserveMatchedFreeSlots(requestSlotMatches); +fulfillMatchedSlots(requestSlotMatches); +} +receivedSlots.clear(); Review Comment: Should we only clear the pending slots after all the requirements are fulfilled? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); Review Comment: In current implementation, the resource requirement will be sent if there is no further requriement change within the `slotRequestMaxInterval`. Another choice is to declare requirement at most once every `slotRequestMaxInterval`. TBH I'm not sure which one would be better. But I think that deserves a discussion. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java: ## @@ -110,7 +110,7 @@ public Optional castInto(Class clazz) { } @Override -public final void start( Review Comment: There is no need to do this. Just override the onStart. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutCheckFuture = +componentMainThreadExecutor.schedule( +this::checkSlotRequestMaxIntervalTimeout, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +private void checkSlotRequestMaxIntervalTimeout() { +if (componentMainThreadExecutor == null || slotRequestMaxInterval == null) { Review Comment: I think we need to assert something went wrong instead of ignore this illegal statement. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java: ## @@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter increment) { } totalResourceRequirements = totalResourceRequirements.add(increment); +if (slotRequestMaxInterval == null) { +declareResourceRequirements(); +return; +} + +Preconditions.checkNotNull(componentMainThreadExecutor); +if (slotRequestMaxIntervalTimeoutCheckFuture != null) { +slotRequestMaxIntervalTimeoutCheckFuture.cancel(true); +} +slotRequestMaxIntervalTimeoutCheckFuture = +componentMainThreadExecutor.schedule( +this::checkSlotRequestMaxIntervalTimeout, +slotRequestMaxInterval.toMilliseconds(), +TimeUnit.MILLISECONDS); +} + +private void checkSlotRequestMaxIntervalTimeout() { +if (componentMainThreadExecutor == null || sl
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1840891716 Thank you @1996fanrui @KarmaGYZ very much for the review I have re evaluated the implementation location of the waiting mechanisms based on @KarmaGYZ offline suggestions. If two waiting mechanisms are placed in DeclarativeSlotPool, there would be preciser & conciser information to maintain. - The maintenance of reserve slot/resource profiles should be simpler and more intuitive. If we can reach an agreement on It, I would like to confirm again whether we still use `mainThreadExecutor` to complete the timeout waiting mechanism for checking? If so, this may require changing the `create `method of `DeclarativeSlotPoolFactory` Please let me know your opinions. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1415668887 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; +import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * {@link SlotPool} implementation which could use the {@link GlobalViewDeclarativeSlotPoolBridge} + * to allocate slots in a global view. Note: It's only used for streaming mode now. + */ +public class GlobalViewDeclarativeSlotPoolBridge extends DeclarativeSlotPoolBridge +implements TimeoutListener { + +public static final Logger LOG = +LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class); +private final Map increasedResourceRequirements; + +private final TimerService timerService; + +private final @Nonnull Set receivedNewSlots; + +private final @Nonnull Map preFulfilledFromAvailableSlots; +private final Time slotRequestMaxInterval; + +public GlobalViewDeclarativeSlotPoolBridge( +JobID jobId, +DeclarativeSlotPoolFactory declarativeSlotPoolFactory, +Clock clock, +Time rpcTimeout, +Time idleSlotTimeout, +Time batchSlotTimeout, +Time slotRequestMaxInterval, +RequestSlotMatchingStrategy requestSlotMatchingStrategy) { +super( +jobId, +declarativeSlotPoolFactory, +clock, +rpcTimeout, +idleSlotTimeout, +batchSlotTimeout, +requestSlotMatchingStrategy); +this.slotRequestMaxInterval = Preconditions.checkNotNull(slotRequestMaxInterval); +this.receivedNewSlots = new HashSet<>(); +this.preFulfilledFromAvailableSlots = new HashMap<>(); +this.increasedResourceRequirements = new HashMap<>(); +this.timerService = +new DefaultTimerService<>( +new ScheduledThreadPoolExecutor(1), +slotRequestMaxInterval.toMilliseconds()); +} + +@Override +protected void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) { +pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest); +increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), false); + +timerService.registerTimeout( +this, slotRequestMaxInterval.getSize(), slotRequestMaxInterval.getUnit()); +} + +@Override +void newSlotsAreAvailable(Collection newSlots) { +receivedNewSlots.addAll(newSlots); +if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) { +// TODO: Do the matching logic only for available slots. +}
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1415663600 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; +import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * {@link SlotPool} implementation which could use the {@link GlobalViewDeclarativeSlotPoolBridge} + * to allocate slots in a global view. Note: It's only used for streaming mode now. + */ +public class GlobalViewDeclarativeSlotPoolBridge extends DeclarativeSlotPoolBridge +implements TimeoutListener { + +public static final Logger LOG = +LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class); +private final Map increasedResourceRequirements; + +private final TimerService timerService; + +private final @Nonnull Set receivedNewSlots; + +private final @Nonnull Map preFulfilledFromAvailableSlots; +private final Time slotRequestMaxInterval; + +public GlobalViewDeclarativeSlotPoolBridge( +JobID jobId, +DeclarativeSlotPoolFactory declarativeSlotPoolFactory, +Clock clock, +Time rpcTimeout, +Time idleSlotTimeout, +Time batchSlotTimeout, +Time slotRequestMaxInterval, +RequestSlotMatchingStrategy requestSlotMatchingStrategy) { +super( +jobId, +declarativeSlotPoolFactory, +clock, +rpcTimeout, +idleSlotTimeout, +batchSlotTimeout, +requestSlotMatchingStrategy); +this.slotRequestMaxInterval = Preconditions.checkNotNull(slotRequestMaxInterval); +this.receivedNewSlots = new HashSet<>(); +this.preFulfilledFromAvailableSlots = new HashMap<>(); +this.increasedResourceRequirements = new HashMap<>(); +this.timerService = +new DefaultTimerService<>( Review Comment: thanks for the comment. SGTM +1. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1415650282 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; +import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * {@link SlotPool} implementation which could use the {@link GlobalViewDeclarativeSlotPoolBridge} + * to allocate slots in a global view. Note: It's only used for streaming mode now. + */ +public class GlobalViewDeclarativeSlotPoolBridge extends DeclarativeSlotPoolBridge +implements TimeoutListener { + +public static final Logger LOG = +LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class); +private final Map increasedResourceRequirements; + +private final TimerService timerService; + +private final @Nonnull Set receivedNewSlots; + +private final @Nonnull Map preFulfilledFromAvailableSlots; +private final Time slotRequestMaxInterval; + +public GlobalViewDeclarativeSlotPoolBridge( +JobID jobId, +DeclarativeSlotPoolFactory declarativeSlotPoolFactory, +Clock clock, +Time rpcTimeout, +Time idleSlotTimeout, +Time batchSlotTimeout, +Time slotRequestMaxInterval, +RequestSlotMatchingStrategy requestSlotMatchingStrategy) { +super( +jobId, +declarativeSlotPoolFactory, +clock, +rpcTimeout, +idleSlotTimeout, +batchSlotTimeout, +requestSlotMatchingStrategy); +this.slotRequestMaxInterval = Preconditions.checkNotNull(slotRequestMaxInterval); +this.receivedNewSlots = new HashSet<>(); +this.preFulfilledFromAvailableSlots = new HashMap<>(); +this.increasedResourceRequirements = new HashMap<>(); +this.timerService = +new DefaultTimerService<>( +new ScheduledThreadPoolExecutor(1), Review Comment: +1 for the proposal. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1413436882 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; +import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * {@link SlotPool} implementation which could use the {@link GlobalViewDeclarativeSlotPoolBridge} + * to allocate slots in a global view. Note: It's only used for streaming mode now. + */ +public class GlobalViewDeclarativeSlotPoolBridge extends DeclarativeSlotPoolBridge +implements TimeoutListener { + +public static final Logger LOG = +LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class); +private final Map increasedResourceRequirements; + +private final TimerService timerService; + +private final @Nonnull Set receivedNewSlots; + +private final @Nonnull Map preFulfilledFromAvailableSlots; +private final Time slotRequestMaxInterval; + +public GlobalViewDeclarativeSlotPoolBridge( +JobID jobId, +DeclarativeSlotPoolFactory declarativeSlotPoolFactory, +Clock clock, +Time rpcTimeout, +Time idleSlotTimeout, +Time batchSlotTimeout, +Time slotRequestMaxInterval, +RequestSlotMatchingStrategy requestSlotMatchingStrategy) { +super( +jobId, +declarativeSlotPoolFactory, +clock, +rpcTimeout, +idleSlotTimeout, +batchSlotTimeout, +requestSlotMatchingStrategy); +this.slotRequestMaxInterval = Preconditions.checkNotNull(slotRequestMaxInterval); +this.receivedNewSlots = new HashSet<>(); +this.preFulfilledFromAvailableSlots = new HashMap<>(); +this.increasedResourceRequirements = new HashMap<>(); +this.timerService = +new DefaultTimerService<>( +new ScheduledThreadPoolExecutor(1), +slotRequestMaxInterval.toMilliseconds()); +} + +@Override +protected void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) { +pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest); +increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), false); + +timerService.registerTimeout( +this, slotRequestMaxInterval.getSize(), slotRequestMaxInterval.getUnit()); +} + +@Override +void newSlotsAreAvailable(Collection newSlots) { +receivedNewSlots.addAll(newSlots); +if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) { +// TODO: Do the matching logic only for available slots. +}
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1413436882 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; +import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * {@link SlotPool} implementation which could use the {@link GlobalViewDeclarativeSlotPoolBridge} + * to allocate slots in a global view. Note: It's only used for streaming mode now. + */ +public class GlobalViewDeclarativeSlotPoolBridge extends DeclarativeSlotPoolBridge +implements TimeoutListener { + +public static final Logger LOG = +LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class); +private final Map increasedResourceRequirements; + +private final TimerService timerService; + +private final @Nonnull Set receivedNewSlots; + +private final @Nonnull Map preFulfilledFromAvailableSlots; +private final Time slotRequestMaxInterval; + +public GlobalViewDeclarativeSlotPoolBridge( +JobID jobId, +DeclarativeSlotPoolFactory declarativeSlotPoolFactory, +Clock clock, +Time rpcTimeout, +Time idleSlotTimeout, +Time batchSlotTimeout, +Time slotRequestMaxInterval, +RequestSlotMatchingStrategy requestSlotMatchingStrategy) { +super( +jobId, +declarativeSlotPoolFactory, +clock, +rpcTimeout, +idleSlotTimeout, +batchSlotTimeout, +requestSlotMatchingStrategy); +this.slotRequestMaxInterval = Preconditions.checkNotNull(slotRequestMaxInterval); +this.receivedNewSlots = new HashSet<>(); +this.preFulfilledFromAvailableSlots = new HashMap<>(); +this.increasedResourceRequirements = new HashMap<>(); +this.timerService = +new DefaultTimerService<>( +new ScheduledThreadPoolExecutor(1), +slotRequestMaxInterval.toMilliseconds()); +} + +@Override +protected void internalRequestNewAllocatedSlot(PendingRequest pendingRequest) { +pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest); +increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), false); + +timerService.registerTimeout( +this, slotRequestMaxInterval.getSize(), slotRequestMaxInterval.getUnit()); +} + +@Override +void newSlotsAreAvailable(Collection newSlots) { +receivedNewSlots.addAll(newSlots); +if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) { +// TODO: Do the matching logic only for available slots. +}
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1413430330 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewPhysicalSlotProviderImpl.java: ## @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.runtime.jobmaster.SlotRequestId; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +/** + * Implementation with global view for {@link PhysicalSlotProvider}. Note: It's only used for + * streaming mode now. + */ +public class GlobalViewPhysicalSlotProviderImpl extends PhysicalSlotProviderImpl { + +public GlobalViewPhysicalSlotProviderImpl( +SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) { +super(slotSelectionStrategy, slotPool); +} + +@Override +protected Map> tryAllocateFromAvailable( +Collection slotRequests) { +Map> availablePhysicalSlots = new HashMap<>(); +for (PhysicalSlotRequest request : slotRequests) { +availablePhysicalSlots.put( Review Comment: I don't understand why we need this new `tryAllocateFromAvailable`? The old `tryAllocateFromAvailable` calls `slotSelectionStrategy.selectBestSlotForProfile` related logic, don't we need it? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java: ## @@ -0,0 +1,261 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobmaster.slotpool; + +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; +import org.apache.flink.runtime.jobmaster.JobMasterId; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService; +import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener; +import org.apache.flink.runtime.taskexecutor.slot.TimerService; +import org.apache.flink.runtime.util.ResourceCounter; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; +import org.apache.flink.util.clock.Clock; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ScheduledThreadPoolExecutor; + +/** + * {@link SlotPool} implementation which could use the {@link GlobalViewDeclarativeSlotPoolBridge} + * to allocate slots in a global view. Note: It's only used for streaming mode now. + */ +public class GlobalViewDeclarativeSlotPoolBridge extends DeclarativeSlotPoolBridge +implements TimeoutListener { + +public static final Logger LOG = +LoggerFactory.getLogger(GlobalViewDeclarativeS
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1837758141 The waiting mechanism is ready for the review. Would you @KarmaGYZ @1996fanrui help take a look if you were in free time? Thank you very much~ And the verification part about the test would be refactored after external junit5 migrated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405582679 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest extends AbstractSlotSharingStrategyTest { Review Comment: Why did we delete the other two tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405582679 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest extends AbstractSlotSharingStrategyTest { Review Comment: Why did we delete the other two tests? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405313903 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest { Review Comment: > In addition to extracting common code, it is also necessary to consider the upgrade issue of Junit4 In order to simply the review of https://github.com/apache/flink/pull/23635 , I submit an hotfix [PR](https://github.com/apache/flink/pull/23806) to upgrade the `LocalInputPreferredSlotSharingStrategyTest` to junit5. > If there are common parts, it may add some complexity to the review process and update process. If it has some common part, we just review the different part. It will simply the review process. And it doesn't introduce any duplicated code. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405352714 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest { Review Comment: `LocalInputPreferredSlotSharingStrategyTest` is upgraded to junit5 in the master branch, please rebase master and go ahead in your free time, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405310873 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,338 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nonnull; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionSlotSharingGroups( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); +} +} + +/** The interface to compute the fittest slot index. */ +interface SlotIndexSupplier { Review Comment: ```suggestion private interface SlotIndexSupplier { ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocat
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1826330445 Hi, @KarmaGYZ @1996fanrui Thank you very much for your patient review comments. I updated it based on your comments. PTAL in your free time,Have a nice weekend~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405028622 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest { Review Comment: It sounds good to me~ Would you mind putting this part of the optimization in a Independent ticket after merging this feature ? -In addition to extracting common code, it is also necessary to consider the upgrade issue of Junit4 -If there are common parts, it may add some complexity to the review process and update process. Anyway, I think both options are good. Please let me know what's your opinion. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405002911 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest { + +@Test +void testCoLocationConstraintIsRespected() { +TestingSchedulingTopology topology = new TestingSchedulingTopology(); +List>> jobVertexInfos = +new ArrayList<>(); +SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup(); +CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl(); +CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl(); +List mockedJobVertices = +getMockedJobVertices(slotSharingGroup1, coLocationGroup1, coLocationGroup2); +setupCase(mockedJobVertices, topology, jobVertexInfos); + +final SlotSharingStrategy strategy = +new TaskBalancedPreferredSlotSharingStrategy( +topology, +Sets.newHashSet(slotSharingGroup1), +Sets.newHashSet(coLocationGroup1, coLocationGroup2)); +List executionVertices1 = jobVertexInfos.get(1).f1; +List executionVertices2 = jobVertexInfos.get(2).f1; +TestingSchedulingExecutionVertex executionVertexOfJv0 = jobVertexInfos.get(0).f1.get(0); +ExecutionSlotSharingGroup executionSlotSharingGroupOfExecutionVertexOfJv0 = + strategy.getExecutionSlotSharingGroup(executionVertexOfJv0.getId()); + +assertThat(executionVertices1).hasSameSizeAs(executionVertices2); +for (int i = 0; i < executionVertices1.size(); i++) { +ExecutionSlotSharingGroup executionSlotSharingGroup = + strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId()); +assertThat(executionSlotSharingGroup) + .isNotEqualTo(executionSlotSharingGroupOfExecutionVertexOfJv0); +assertThat(executionSlotSharingGroup) +.isEqualTo( +strategy.getExecutionSlotSharingGroup( +executionVertices2.get(i).getId())); +} + +List executionVertices3 = jobVertexInfos.get(3).f1; +List executionVertices4 = jobVertexInfos.get(4).f1; +for (int i = 0; i < executionVertices1.size(); i++) { + assertThat(strategy.getExecutionSlotSharingGroup(executionVertices3.get(i).getId())) +.isEqualTo( +strategy.getExecutionSlotSharingGroup( +executionVertices4.get(i).getId())); +} +} + +@Nonnull +private static ArrayList getMockedJ
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405002804 ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.guava31.com.google.common.collect.Lists; +import org.apache.flink.shaded.guava31.com.google.common.collect.Sets; + +import org.assertj.core.data.Offset; +import org.junit.jupiter.api.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */ +class TaskBalancedPreferredSlotSharingStrategyTest { + +@Test +void testCoLocationConstraintIsRespected() { +TestingSchedulingTopology topology = new TestingSchedulingTopology(); +List>> jobVertexInfos = +new ArrayList<>(); +SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup(); +CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl(); +CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl(); +List mockedJobVertices = +getMockedJobVertices(slotSharingGroup1, coLocationGroup1, coLocationGroup2); +setupCase(mockedJobVertices, topology, jobVertexInfos); + +final SlotSharingStrategy strategy = +new TaskBalancedPreferredSlotSharingStrategy( +topology, +Sets.newHashSet(slotSharingGroup1), +Sets.newHashSet(coLocationGroup1, coLocationGroup2)); +List executionVertices1 = jobVertexInfos.get(1).f1; +List executionVertices2 = jobVertexInfos.get(2).f1; +TestingSchedulingExecutionVertex executionVertexOfJv0 = jobVertexInfos.get(0).f1.get(0); +ExecutionSlotSharingGroup executionSlotSharingGroupOfExecutionVertexOfJv0 = + strategy.getExecutionSlotSharingGroup(executionVertexOfJv0.getId()); + +assertThat(executionVertices1).hasSameSizeAs(executionVertices2); +for (int i = 0; i < executionVertices1.size(); i++) { +ExecutionSlotSharingGroup executionSlotSharingGroup = + strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId()); +assertThat(executionSlotSharingGroup) + .isNotEqualTo(executionSlotSharingGroupOfExecutionVertexOfJv0); +assertThat(executionSlotSharingGroup) +.isEqualTo( +strategy.getExecutionSlotSharingGroup( +executionVertices2.get(i).getId())); +} + +List executionVertices3 = jobVertexInfos.get(3).f1; +List executionVertices4 = jobVertexInfos.get(4).f1; +for (int i = 0; i < executionVertices1.size(); i++) { Review Comment: nice catch -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.o
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405002722 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionSlotSharingGroups( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); +} +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ +private final Map> +paralleledExecutionSlotSharingGroupsMap; + +/** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ +private final Map slotSharingGroupIndexMap; + +private final Map +executionSlotSharingGroupMap; + +private final Map coLocationGroupMap; + +private final Map +constraintToExecutionSlotSharingGroupMap; + +private TaskBalancedExecutionSlotSharingGroupBuilder( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +this.topology = checkNotNull(topology); + +this.coLocationGroupMap = new HashMap<>(); +for (CoLocationGroup coLocationGroup : coLocationGroups) { +for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { +coLocationGroupMap.put(jobVertexId, coLocationGroup); +} +} + +
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1405002564 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionSlotSharingGroups( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); +} +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ +private final Map> +paralleledExecutionSlotSharingGroupsMap; + +/** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ +private final Map slotSharingGroupIndexMap; + +private final Map +executionSlotSharingGroupMap; + +private final Map coLocationGroupMap; + +private final Map +constraintToExecutionSlotSharingGroupMap; + +private TaskBalancedExecutionSlotSharingGroupBuilder( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +this.topology = checkNotNull(topology); + +this.coLocationGroupMap = new HashMap<>(); +for (CoLocationGroup coLocationGroup : coLocationGroups) { +for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { +coLocationGroupMap.put(jobVertexId, coLocationGroup); +} +} + +
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1400131195 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionSlotSharingGroups( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); +} +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ +private final Map> +paralleledExecutionSlotSharingGroupsMap; + +/** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ +private final Map slotSharingGroupIndexMap; + +private final Map +executionSlotSharingGroupMap; + +private final Map coLocationGroupMap; + +private final Map +constraintToExecutionSlotSharingGroupMap; + +private TaskBalancedExecutionSlotSharingGroupBuilder( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +this.topology = checkNotNull(topology); + +this.coLocationGroupMap = new HashMap<>(); +for (CoLocationGroup coLocationGroup : coLocationGroups) { +for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { +coLocationGroupMap.put(jobVertexId, coLocationGroup); +} +} + +
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1398939301 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,361 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionSlotSharingGroups( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, slotSharingGroups, coLocationGroups); +} +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** Record the {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. */ +private final Map> +paralleledExecutionSlotSharingGroupsMap; + +/** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ +private final Map slotSharingGroupIndexMap; + +private final Map +executionSlotSharingGroupMap; + +private final Map coLocationGroupMap; + +private final Map +constraintToExecutionSlotSharingGroupMap; + +private TaskBalancedExecutionSlotSharingGroupBuilder( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +this.topology = checkNotNull(topology); + +this.coLocationGroupMap = new HashMap<>(); +for (CoLocationGroup coLocationGroup : coLocationGroups) { +for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) { +coLocationGroupMap.put(jobVertexId, coLocationGroup); +} +} + +
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1807589290 Hi @KarmaGYZ , thanks for your hard review! > I think this PR contains two components. First would be a supplement of [FLINK-33448](https://issues.apache.org/jira/browse/FLINK-33448). Second is part of the TASKS strategy. I think we may split it into two seperate commit. Split it makes sense, it's clearer. > It would be better to include [FLINK-33388](https://issues.apache.org/jira/browse/FLINK-33388) and introduce TASKS strategy. Would you mind if we keep them into multiple PRs? I'm afraid one PR has a lot of commits and changes is hard to review. Of course, only one PR is acceptable for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1390238967 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java: ## @@ -295,6 +294,19 @@ public static SlotManagerConfiguration fromConfiguration( redundantTaskManagerNum); } +@VisibleForTesting +public static SlotMatchingStrategy getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) { Review Comment: ```suggestion static SlotMatchingStrategy getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) { ``` Is the default enough here? I see all caller are the same package. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionToESsgMap( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set logicalSlotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, logicalSlotSharingGroups, coLocationGroups); +} +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** + * Record the number of {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. + */ +private final Map> ssgToExecutionSSGs; + +/** + * Record the next round-robin {@link ExecutionSlotSharingGroup} index for {@link + * SlotSharingGroup}s. + */ +private final Map slotRoundRobinIndex; + +private final Map +executionSlotSharingGroupMap; + +private final Map coLocationGroupMap; + +private final Map clcToES
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1390593913 ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ## @@ -76,6 +76,6 @@ public ResourceProfile getResourceProfile() { @Override public String toString() { -return "SlotSharingGroup " + this.ids.toString(); +return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + resourceProfile + '}'; Review Comment: Should belong to a seperate hotfix commit. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ## @@ -31,22 +35,26 @@ class ExecutionSlotSharingGroup { private final Set executionVertexIds; -private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; +@Nonnull private final SlotSharingGroup slotSharingGroup; Review Comment: IIUC, we change it to `SlotSharingGroup` only for test? I don't think that's a good practice. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,270 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put evenly in each ExecutionSlotSharingGroup. Co-location + * constraints will be respected. + */ +class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy { + +public static final Logger LOG = + LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class); + +TaskBalancedPreferredSlotSharingStrategy( +final SchedulingTopology topology, +final Set slotSharingGroups, +final Set coLocationGroups) { +super(topology, slotSharingGroups, coLocationGroups); +} + +@Override +protected Map computeExecutionToESsgMap( +SchedulingTopology schedulingTopology) { +return new TaskBalancedExecutionSlotSharingGroupBuilder( +schedulingTopology, this.logicalSlotSharingGroups, this.coLocationGroups) +.build(); +} + +static class Factory implements SlotSharingStrategy.Factory { + +public TaskBalancedPreferredSlotSharingStrategy create( +final SchedulingTopology topology, +final Set logicalSlotSharingGroups, +final Set coLocationGroups) { + +return new TaskBalancedPreferredSlotSharingStrategy( +topology, logicalSlotSharingGroups, coLocationGroups); +} +} + +/** SlotSharingGroupBuilder class for balanced scheduling strategy. */ +private static class TaskBalancedExecutionSlotSharingGroupBuilder { + +private final SchedulingTopology topology; + +private final Map slotSharingGroupMap; + +/** + * Record the number of {@link ExecutionSlotSharingGroup}s for {@link SlotSharingGroup}s. + */ +private final Map> ssgToExecutionSSGs; + +/** + * Record the
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1806832620 Thank you @KarmaGYZ @1996fanrui very much for your comments. and I updated the PR based on your comments. Have a great weekend~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r139022 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: It's fine for me. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1390231615 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: yes, that's right. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1390096801 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: It's fine for me. If so, this PR just introduces `TaskBalancedPreferredSlotSharingStrategy` related code and doesn't use them, right? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1390096271 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +711,60 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +@Documentation.Section({ +Documentation.Sections.EXPERT_SCHEDULING, +Documentation.Sections.ALL_TASK_MANAGER +}) +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = +ConfigOptions.key("taskmanager.load-balance.mode") +.enumType(TaskManagerLoadBalanceMode.class) +.defaultValue(TaskManagerLoadBalanceMode.NONE) +.withDescription( +Description.builder() +.text( +"Mode for the load-balance allocation strategy across all available %s.", +code("TaskManagers")) +.list( +text( +"The %s mode tries to spread out the slots evenly across all available %s.", + code(TaskManagerLoadBalanceMode.SLOTS.name()), +code("TaskManagers")), +text( +"The %s mode tries to ensure that the number of tasks is relative balanced across all available %s.", Review Comment: hi, @1996fanrui Thanks for your review. Based on the results of the [discussion](https://github.com/apache/flink/pull/23635#discussion_r1383243917). The `TASKS` enumeration and its description will be temporarily removed, and after the entire function is completed, they will be reintroduced(with addressed your comments) to ensure consistency between the function and the document. WDYT? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1388936955 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: I'm lean to introduce `TASKS` after the functionality is complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1388936955 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: I lean to introduce `TASKS` after the functionality is complete. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1385855309 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ## @@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup { private final Set executionVertexIds; -private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; +private @Nonnull SlotSharingGroup slotSharingGroup; +/** @deprecated Only for test classes. */ +@Deprecated Review Comment: The constructor should be marked as `VisibleForTesting` instead of Depreated if it's only for test. ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +711,60 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +@Documentation.Section({ +Documentation.Sections.EXPERT_SCHEDULING, +Documentation.Sections.ALL_TASK_MANAGER +}) +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = +ConfigOptions.key("taskmanager.load-balance.mode") +.enumType(TaskManagerLoadBalanceMode.class) +.defaultValue(TaskManagerLoadBalanceMode.NONE) +.withDescription( +Description.builder() +.text( +"Mode for the load-balance allocation strategy across all available %s.", +code("TaskManagers")) +.list( +text( +"The %s mode tries to spread out the slots evenly across all available %s.", + code(TaskManagerLoadBalanceMode.SLOTS.name()), +code("TaskManagers")), +text( +"The %s mode tries to ensure that the number of tasks is relative balanced across all available %s.", Review Comment: ```suggestion "The %s mode tries to spread out the tasks evenly across all available %s.", ``` It's better to keep same with SLOTS. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java: ## @@ -84,4 +98,23 @@ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContex allocationTimeout, context::getResourceProfile); } + +private static SlotSharingStrategy.Factory getSlotSharingStrategyFactory( +@Nonnull Configuration configuration) { + +TaskManagerLoadBalanceMode taskManagerLoadBalanceMode = + TaskManagerLoadBalanceMode.loadFromConfiguration(configuration); +if (taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.TASKS) { +return new TaskBalancedPreferredSlotSharingStrategy.Factory(); +} +if (taskManagerLoadBalanceMode != TaskManagerLoadBalanceMode.NONE +&& taskManagerLoadBalanceMode != TaskManagerLoadBalanceMode.SLOTS) { +LOG.warn( +"The value '{}' of '{}' isn't supported now, it will rollback to default strategy '{}'.", +taskManagerLoadBalanceMode, +TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.key(), + TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue()); +} +return new LocalInputPreferredSlotSharingStrategy.Factory(); Review Comment: ```suggestion if (taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.NONE || taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS) { return new LocalInputPreferredSlotSharingStrategy.Factory(); } throw new UnsupportedXXXException("Unknown TaskManagerLoadBalanceMode"); ``` How about throw exception instead log.warn? If other developers add new enum in the future, the `exception` can let him know and he should change the logic to support the new enum properly. ## flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java: ## @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); yo
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1387973762 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: Hi, @KarmaGYZ Thanks a lot for the review. This part is intended to be addressed in FLINK-33388. I have to admit that in the current PR, perhaps the balanced allocation strategy option `TASKS `for Slot latitude has been turned on too early, because there are two issues: - We have opened the functions corresponding to the incomplete `TASKS` option. If we enable the `TASKS` option, there will be a situation where the allocation of Slot latitude is balanced, but the allocation of Slot to TM is not balanced, although this has some improvements compared to the old strategy. - If the entire FLIP process progresses slowly, it may affect the release of the version Based on the above questions, can one of the following ways be considered to advance the current work - Ensure that the functionality is complete. Currently, only relevant functional codes have been added to the PR, we can disable the entry with the configuration item TASKS temporarily. It will be opened again when FLINK-33388 is pushed forward. - Alternatively, the work content in FLINK-33388 can also be included in the current PR. This may increase the workload of the reviewer due to too many changes. - Alternatively, keep the current state and go a head by raw plan. I'm willing to hear any opinions and suggestions about this. Thank you~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1387973762 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: Hi, @KarmaGYZ Thanks a lot for the review. This part is intended to be addressed in FLINK-33388. I have to admit that in the current PR, perhaps the balanced allocation strategy option `TASKS `for Slot latitude has been turned on too early, because there are two issues: - We have opened the functions corresponding to the incomplete `TASKS` option. If we enable the `TASKS` option, there will be a situation where the allocation of Slot latitude is balanced, but the allocation of Slot to TM is not balanced, although this has some improvements compared to the old strategy. - If the entire FLIP process progresses slowly, it may affect the release of the version Based on the above questions, can one of the following ways be considered to advance the current work - Ensure that the functionality is complete. Currently, only relevant functional codes have been added to the PR, we can disable the entry with the configuration item TASKS temporarily. It will be opened again when FLINK-33388 is pushed forward. - Alternatively, the work content in FLINK-33388 can also be included in the current PR. This may increase the workload of the reviewer due to too many changes. I'm willing to hear any opinions and suggestions about this. Thank you~ :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
KarmaGYZ commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917 ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java: ## @@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy( SlotManagerUtils.generateDefaultSlotResourceProfile( totalResourceProfile, numSlotsPerWorker); this.availableResourceMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: What about tasks? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ## @@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup { private final Set executionVertexIds; -private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN; +private @Nonnull SlotSharingGroup slotSharingGroup; +/** @deprecated Only for test classes. */ +@Deprecated Review Comment: `@VisibleForTesting` ## flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java: ## @@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration( configuration.getBoolean( ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED); -boolean evenlySpreadOutSlots = - configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY); +TaskManagerLoadBalanceMode taskManagerLoadBalanceMode = + TaskManagerLoadBalanceMode.loadFromConfiguration(configuration); final SlotMatchingStrategy slotMatchingStrategy = -evenlySpreadOutSlots +taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS Review Comment: ditto. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java: ## @@ -42,69 +43,23 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy implements SlotSharingStrategy, SchedulingTopologyListener { Review Comment: No need to implement them again. ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java: ## @@ -42,69 +43,23 @@ import java.util.Set; import static org.apache.flink.util.Preconditions.checkNotNull; -import static org.apache.flink.util.Preconditions.checkState; /** * This strategy tries to reduce remote data exchanges. Execution vertices, which are connected and * belong to the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. * Co-location constraints will be respected. */ -class LocalInputPreferredSlotSharingStrategy +class LocalInputPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy implements SlotSharingStrategy, SchedulingTopologyListener { -private final Map executionSlotSharingGroupMap; - -private final Set logicalSlotSharingGroups; - -private final Set coLocationGroups; +public static final Logger LOG = + LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class); Review Comment: Where do we use it? ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ## @@ -46,6 +47,12 @@ public class SlotSharingGroup implements java.io.Serializable { // +public SlotSharingGroup() {} + +public SlotSharingGroup(ResourceProfile resourceProfile) { Review Comment: Seems we don't need to do that. Just instantiate a SlotSharingGroup and set the resource profile. ## flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java: ## @@ -46,6 +47,12 @@ public class SlotSharingGroup implements java.io.Serializable { // +public SlotSharingGroup() {} + +public SlotSharingGroup(ResourceProfile resourceProfile) { Review Comment: Only visible for testing? ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java: ## @@ -31,22 +34,36 @@ class ExecutionSlotSh
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1382851909 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java: ## @@ -84,4 +96,25 @@ public ExecutionSlotAllocator createInstance(final ExecutionSlotAllocationContex allocationTimeout, context::getResourceProfile); } + +private static SlotSharingStrategy.Factory getSlotSharingStrategyFactory( +@Nullable Configuration configuration) { Review Comment: I checked the parameter passing link. There will be no null values of `configuration`, I will remove this null-configuration judgment. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1381232496 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: Please go ahead for the first JIRA, please cc me after the PR is ready, thanks~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1381224693 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: hi, @1996fanrui Thanks for the idea. There's nothing better! >After this FLIP is merged, if a job with cluster.evenly-spread-out-slots: true, and didn't set the taskmanager.load- >balance.mode. Should we change the default value from None to Slots? Yes, I'd prefer to be compatible with old configuration item. - This can provide users with a smooth over experience - Removing the old configuration item requires a process for evolution. For example, after one or two editions, we will completely discard the old configuration item. I'd create this JIRA ticket and advance the related work if there's nothing ambiguous. Looking forward to your reply~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1381224693 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: hi, @1996fanrui Thanks for the idea. There's nothing better! >After this FLIP is merged, if a job with cluster.evenly-spread-out-slots: true, and didn't set the taskmanager.load- >balance.mode. Should we change the default value from None to Slots? Yes, I'd prefer to be compatible with old configuration item. - This can provide users with a smooth over experience - Removing the old configuration item requires a process for evolution. For example, after one or two editions, we will completely discard the old configuration item. I'd create this JIRA ticket and advance the related work if there's nothing ambiguous. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1381212518 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: Thanks @RocMarshal for the analysis! How about creating a JIRA to add `taskmanager.load-balance.mode` and deprecate `cluster.evenly-spread-out-slots` first? - It don't introduce any feature, just using the new option and deprecate old option. - It should be the first JIRA, and `TaskManagerLoadBalanceMode` just have 2 enums: `None` and `Slots`. - In the second JIRA(maybe this PR), you can start your feature: adding the `Tasks`, and supports it. I think this process is clearer and easier than the process you proposed, and it don't effect users side. -- Besides how to implement this, I have a question about compatibility of `taskmanager.load-balance.mode` and `cluster.evenly-spread-out-slots`. As I understand : - `taskmanager.load-balance.mode : Slots` == `cluster.evenly-spread-out-slots: true` - `taskmanager.load-balance.mode : None` == `cluster.evenly-spread-out-slots: false` (By default.) After this FLIP is merged, if a job with `cluster.evenly-spread-out-slots: true`, and didn't set the `taskmanager.load-balance.mode`. Should we change the default value from `None` to `Slots`? - If yes, we can support old users directly. - If no, users must set `taskmanager.load-balance.mode : Slots` manually if they want to the `evenly-spread-out-slots` feature. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378358266 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: hi, @1996fanrui Thank you very much for the precise comments ! I'd prefer the second and after a little adjustment as follows: 1. Add `@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)` 2. Temporarily comment out 'SLOT' mode in the `TaskManagerLoadBalanceMode` 3. Add the description about the configuration item like follows: ``` The load balance mode of taskmanager when processing scheduling tasks. 'NONE' means that the scheduler does not consider any dimensional balance when scheduling tasks. 'TASKS' means that the scheduler prioritizes ensuring that the number of tasks on each TM is balanced when scheduling tasks. Please view FLIP-370 for more details. ``` 4. After all PRs merged, we'll add the 'SLOTS' mode back, supplement the description and deprecate slot-spread-out. the added description would like follows: ``` 'SLOTS' indicates that the scheduler prioritizes and balances the use of each TM's slot when scheduling tasks. ``` Please let me know what's your opinion~ -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378352765 ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location + * constraints are ignored at present. + */ +class BalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy Review Comment: SGTM~ ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location + * constraints are ignored at present. Review Comment: I'll update the header comments. ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in mi
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378332744 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = Review Comment: It likes other options, it's better to add some comments, and `@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)`? Or should we add `@Documentation.Section` and deprecate `slot-spread-out` after all PRs is merged? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
1996fanrui commented on code in PR #23635: URL: https://github.com/apache/flink/pull/23635#discussion_r1378331133 ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = +ConfigOptions.key("taskmanager.load-balance.mode") +.enumType(TaskManagerLoadBalanceMode.class) +.defaultValue(TaskManagerLoadBalanceMode.NONE) +.withDescription( +"The load balance mode of taskmanager when processing scheduling tasks."); Review Comment: It's better to describe these values in detail, it's useful for users to understand this option. ## flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java: ## @@ -708,6 +708,20 @@ public class TaskManagerOptions { "Time we wait for the timers in milliseconds to finish all pending timer threads" + " when the stream task is cancelled."); +public static final ConfigOption TASK_MANAGER_LOAD_BALANCE_MODE = +ConfigOptions.key("taskmanager.load-balance.mode") +.enumType(TaskManagerLoadBalanceMode.class) +.defaultValue(TaskManagerLoadBalanceMode.NONE) +.withDescription( +"The load balance mode of taskmanager when processing scheduling tasks."); + +/** Type of taskmanager.load-balance.mode. */ Review Comment: ```suggestion /** Type of {@link #TASK_MANAGER_LOAD_BALANCE_MODE}. */ ``` ## flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java: ## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex; +import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.AbstractMap; +import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * This strategy tries to get a balanced tasks scheduling. Execution vertices, which are belong to + * the same SlotSharingGroup, tend to be put in the same ExecutionSlotSharingGroup. Co-location + * constraints are ignored at present. + */ +class BalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy Review Comment: ```suggestion class TaskBalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy ``` How about `TaskBalancedPreferredSlotSharingStrategy`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
flinkbot commented on PR #23635: URL: https://github.com/apache/flink/pull/23635#issuecomment-1787488771 ## CI report: * 0816a1779cbf97c1e3526e18e70d3d2e09817a05 UNKNOWN Bot commands The @flinkbot bot supports the following commands: - `@flinkbot run azure` re-run the last Azure build -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]
RocMarshal opened a new pull request, #23635: URL: https://github.com/apache/flink/pull/23635 ## What is the purpose of the change - Support tasks balancing at slot level for Default Scheduler ## Brief change log - *Introduce BalancedPreferredSlotSharingStrategy to support tasks balancing at slot level.* - *Expose the configuration item to enable tasks balancing at slot level for Default Scheduler.* ## Verifying this change This change added tests and can be verified as follows: - *`org.apache.flink.runtime.scheduler.BalancedPreferredSlotSharingStrategyTest`* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / **don't know**) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (**yes** / no) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org