Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-06-05 Thread via GitHub


RocMarshal commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-2151329162

   >  Hi, are there any updates or progress on this work as part of FLIP-370?
   
   thx for your attention. It's  still in working. will update in the next few 
days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-06-05 Thread via GitHub


klam-shop commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-2150824717

    Hi, are there any updates or progress on this work as part of FLIP-370? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-17 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1455029438


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** The class is used to represent the loading weight abstraction of slots. */
+@Internal
+public interface LoadingWeight extends Comparable, Serializable 
{
+
+LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+
+static LoadingWeight ofDefaultLoadingWeight(float loading) {
+return new DefaultLoadingWeight(loading);
+}
+
+static List ofDefaultLoadingWeights(int... loadings) {
+List loadingWeights = new ArrayList<>(loadings.length);
+for (int loading : loadings) {
+loadingWeights.add(ofDefaultLoadingWeight(loading));
+}
+return loadingWeights;
+}
+
+static List supplyEmptyLoadWeights(int number) {
+Preconditions.checkArgument(number >= 0);
+LoadingWeight[] loadingWeights = new LoadingWeight[number];
+Arrays.parallelSetAll(loadingWeights, value -> EMPTY);
+return Arrays.stream(loadingWeights).collect(Collectors.toList());
+}
+
+/**
+ * Get the loading value.
+ *
+ * @return A float represented the loading.
+ */
+float getLoading();
+
+/**
+ * Merge the other loading weight into itself.

Review Comment:
   For example: this comment 
https://github.com/apache/flink/pull/23635#discussion_r1453243566 .
   
   If we allow reset the `LoadingWeight`, it's easy to introduce some bugs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-17 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1455025025


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** The class is used to represent the loading weight abstraction of slots. */
+@Internal
+public interface LoadingWeight extends Comparable, Serializable 
{
+
+LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+
+static LoadingWeight ofDefaultLoadingWeight(float loading) {
+return new DefaultLoadingWeight(loading);
+}
+
+static List ofDefaultLoadingWeights(int... loadings) {
+List loadingWeights = new ArrayList<>(loadings.length);
+for (int loading : loadings) {
+loadingWeights.add(ofDefaultLoadingWeight(loading));
+}
+return loadingWeights;
+}
+
+static List supplyEmptyLoadWeights(int number) {
+Preconditions.checkArgument(number >= 0);
+LoadingWeight[] loadingWeights = new LoadingWeight[number];
+Arrays.parallelSetAll(loadingWeights, value -> EMPTY);
+return Arrays.stream(loadingWeights).collect(Collectors.toList());
+}
+
+/**
+ * Get the loading value.
+ *
+ * @return A float represented the loading.
+ */
+float getLoading();
+
+/**
+ * Merge the other loading weight into itself.

Review Comment:
   Immutable object is safe for itself. For example, create a new 
`LoadingWeight` means `LoadingWeight` is very safe. 
   
   But callers  are hard to maintained. For example: 
`WeightLoadable#setLoading` can reset the `LoadingWeight` for each slot. It's 
not safe for these slot related class. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-17 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1455014315


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** The class is used to represent the loading weight abstraction of slots. */
+@Internal
+public interface LoadingWeight extends Comparable, Serializable 
{
+
+LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+
+static LoadingWeight ofDefaultLoadingWeight(float loading) {
+return new DefaultLoadingWeight(loading);
+}
+
+static List ofDefaultLoadingWeights(int... loadings) {
+List loadingWeights = new ArrayList<>(loadings.length);
+for (int loading : loadings) {
+loadingWeights.add(ofDefaultLoadingWeight(loading));
+}
+return loadingWeights;
+}
+
+static List supplyEmptyLoadWeights(int number) {
+Preconditions.checkArgument(number >= 0);
+LoadingWeight[] loadingWeights = new LoadingWeight[number];
+Arrays.parallelSetAll(loadingWeights, value -> EMPTY);
+return Arrays.stream(loadingWeights).collect(Collectors.toList());
+}
+
+/**
+ * Get the loading value.
+ *
+ * @return A float represented the loading.
+ */
+float getLoading();
+
+/**
+ * Merge the other loading weight into itself.

Review Comment:
   Could we consider making this implementation immutable? This means that 
every time an object's internal value changes, a new object will be created. An 
immutable object implies high security.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-17 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1454932673


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/LoadingWeight.java:
##
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** The class is used to represent the loading weight abstraction of slots. */
+@Internal
+public interface LoadingWeight extends Comparable, Serializable 
{
+
+LoadingWeight EMPTY = new DefaultLoadingWeight(0f);
+
+static LoadingWeight ofDefaultLoadingWeight(float loading) {
+return new DefaultLoadingWeight(loading);
+}
+
+static List ofDefaultLoadingWeights(int... loadings) {
+List loadingWeights = new ArrayList<>(loadings.length);
+for (int loading : loadings) {
+loadingWeights.add(ofDefaultLoadingWeight(loading));
+}
+return loadingWeights;
+}
+
+static List supplyEmptyLoadWeights(int number) {
+Preconditions.checkArgument(number >= 0);
+LoadingWeight[] loadingWeights = new LoadingWeight[number];
+Arrays.parallelSetAll(loadingWeights, value -> EMPTY);
+return Arrays.stream(loadingWeights).collect(Collectors.toList());
+}
+
+/**
+ * Get the loading value.
+ *
+ * @return A float represented the loading.
+ */
+float getLoading();
+
+/**
+ * Merge the other loading weight into itself.

Review Comment:
   From the semantic, it will merge other into itself.
   
   But I see the implementation of `DefaultLoadingWeight`, It doesn't update 
itself.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-16 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1454413854


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/WeightLoadable.java:
##
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+
+import javax.annotation.Nonnull;
+
+/**
+ * The interface that holds the {@link LoadingWeight} getter and setter is 
required for
+ * corresponding slot abstractions.
+ */
+@Internal
+public interface WeightLoadable {
+
+/**
+ * Get the loading weight.
+ *
+ * @return An implementation object of {@link LoadingWeight}.
+ */
+default LoadingWeight getLoading() {
+return LoadingWeight.EMPTY;
+}
+
+/**
+ * Set the loading weight.
+ *
+ * @param loadingWeight An implementation of {@link LoadingWeight}.
+ */
+void setLoading(@Nonnull LoadingWeight loadingWeight);

Review Comment:
   Or `ExecutionSlotSharingGroup` should not `implements WeightLoadable`.
   
   It only initialize and return loading.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-16 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1453256769


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -330,12 +334,32 @@ public void onUnknownDeploymentsOf(
 getMainThreadExecutor(),
 log);
 
+TaskManagerLoadBalanceMode mode =
+TaskManagerLoadBalanceMode.loadFromConfiguration(
+jobMasterConfiguration.getConfiguration());
+JobManagerOptions.SchedulerType schedulerType =
+slotPoolServiceSchedulerFactory.getSchedulerType();
+Time slotRequestMaxInterval = null;
+boolean slotBatchAllocatable = false;
+if (mode == TASKS && schedulerType == 
JobManagerOptions.SchedulerType.Adaptive) {

Review Comment:
   The `Adaptive` should be `Default`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-16 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1453115386


##
flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java:
##
@@ -446,6 +446,13 @@ public enum JobStoreType {
 
.defaultValue(HeartbeatManagerOptions.HEARTBEAT_TIMEOUT.defaultValue())
 .withDescription("The timeout in milliseconds for a idle 
slot in Slot Pool.");
 
+@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
+public static final ConfigOption SLOT_REQUEST_MAX_INTERVAL =
+key("slot.request.max-interval")
+.durationType()
+.defaultValue(Duration.ofMillis(50L))

Review Comment:
   The default value is 20ms in the FLIP-370.
   
   [1] 
https://cwiki.apache.org/confluence/display/FLINK/FLIP-370%3A+Support+Balanced+Tasks+Scheduling



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -330,12 +334,32 @@ public void onUnknownDeploymentsOf(
 getMainThreadExecutor(),
 log);
 
+TaskManagerLoadBalanceMode mode =
+TaskManagerLoadBalanceMode.loadFromConfiguration(
+jobMasterConfiguration.getConfiguration());
+JobManagerOptions.SchedulerType schedulerType =
+slotPoolServiceSchedulerFactory.getSchedulerType();
+Time slotRequestMaxInterval = null;
+boolean slotBatchAllocatable = false;
+if (mode == TASKS && schedulerType == 
JobManagerOptions.SchedulerType.Adaptive) {

Review Comment:
   IIUC, we should enable `slotBatchAllocatable` when mode == TASKS and jobType 
is STREAMING.
   
   
   
![image](https://github.com/apache/flink/assets/38427477/0e2db6c9-60d6-427f-b7cd-98c404217e89)
   



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/loading/DefaultLoadingWeight.java:
##
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.loading;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nonnull;
+
+import java.util.Objects;
+
+/** The default implementation of {@link LoadingWeight} based on task count 
loading. */
+@Internal
+public class DefaultLoadingWeight implements LoadingWeight {
+
+private float tasks;
+
+DefaultLoadingWeight(float tasks) {
+Preconditions.checkArgument(tasks >= 0.0f);
+this.tasks = tasks;
+}
+
+public void incLoading() {
+this.tasks += 1.0f;
+}
+
+@Override
+public float getLoading() {
+return tasks;
+}
+
+@Override
+public LoadingWeight merge(LoadingWeight other) {
+if (other == null) {
+return LoadingWeight.ofDefaultLoadingWeight(this.tasks);
+}
+return LoadingWeight.ofDefaultLoadingWeight(tasks + 
other.getLoading());
+}
+
+@Override
+public int compareTo(@Nonnull LoadingWeight o) {
+return Float.compare(tasks, o.getLoading());
+}

Review Comment:
   Why don't move it to interface?
   
   ```
   
   @Override
   default int compareTo(@Nonnull LoadingWeight o) {
   return Float.compare(getLoading(), o.getLoading());
   }
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   Why don't we introduce the `JobManagerOptions#SLOT_REQUEST_MAX_INTERVAL` in 
the third commit: ` Support resource request wait mechanism at 
DefaultDeclarativeSlotPool side for Default Scheduler`.
   
   You introduce it in the seventh commit `[FLINK-33388] Support tasks 
balancing at TM level for 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-15 Thread via GitHub


KarmaGYZ commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1892964860

   > @RocMarshal Just be curious about the progress, does this PR still wait 
for some comments to be addressed before it could be merged?
   
   This PR is in progress now. We plan to merge it after the complete Task 
Balancing feature is implemented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2024-01-15 Thread via GitHub


Myasuka commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1892504610

   @RocMarshal Just be curious about the progress, does this PR still wait for 
some comments to be addressed before it could be merged?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-26 Thread via GitHub


RocXing commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1436413745


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,344 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionSlotSharingGroups(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
+}
+}
+
+/** The interface to compute the fittest slot index. */
+private interface SlotIndexSupplier {
+
+int getFittestSlotIndex(
+final SlotSharingGroup slotSharingGroup,
+@Nullable final SchedulingExecutionVertex executionVertex);
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+private final Map>
+paralleledExecutionSlotSharingGroupsMap;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotSharingGroupIndexMap;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map
+constraintToExecutionSlotSharingGroupMap;
+
+private TaskBalancedExecutionSlotSharingGroupBuilder(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+this.topology = checkNotNull(topology);
+
+

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-20 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432535390


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   > you plan to modify this parameter in the commit introducing the 
configuration? 
   
   Yes.
   
   
   Thank you for your confirmation~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-20 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432535390


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   > you plan to modify this parameter in the commit introducing the 
configuration? 
   Yes.
   
   
   Thank you confirmation~



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   > you plan to modify this parameter in the commit introducing the 
configuration? 
   Yes.
   
   
   Thank you for your confirmation~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-20 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432535390


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   Got it, Thank you confirmation~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-20 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432482972


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
 this.totalResourceRequirements = ResourceCounter.empty();
 this.fulfilledResourceRequirements = ResourceCounter.empty();
 this.slotToRequirementProfileMappings = new HashMap<>();
+this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+this.slotRequestMaxInterval = slotRequestMaxInterval;
 }
 
 @Override
 public void increaseResourceRequirementsBy(ResourceCounter increment) {
-if (increment.isEmpty()) {
+updateResourceRequirementsBy(
+increment,
+() -> totalResourceRequirements = 
totalResourceRequirements.add(increment));

Review Comment:
   I lean to this version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-20 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432481088


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   IIUC, you plan to modify this parameter in the commit introducing the 
configuration? Sounds good from my side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-19 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
 this.totalResourceRequirements = ResourceCounter.empty();
 this.fulfilledResourceRequirements = ResourceCounter.empty();
 this.slotToRequirementProfileMappings = new HashMap<>();
+this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+this.slotRequestMaxInterval = slotRequestMaxInterval;
 }
 
 @Override
 public void increaseResourceRequirementsBy(ResourceCounter increment) {
-if (increment.isEmpty()) {
+updateResourceRequirementsBy(
+increment,
+() -> totalResourceRequirements = 
totalResourceRequirements.add(increment));

Review Comment:
   @KarmaGYZ Thanks a lot for the comments.
   
   Please let me have a try on explaining it.
   
   My initial intention was to reduce code redundancy, as there are currently 
two calling functions, and the only difference between them is whether to 
reduce resource requests and increase resource requests.
   
   If the extraction of the common part is too broad, then I am very willing to 
improve it. for example:  
   ```
   @Override
   public void increaseResourceRequirementsBy(ResourceCounter increment) {
   if (increment.isEmpty()) {
   return;
   }
   
   totalResourceRequirements = totalResourceRequirements.add(increment);
   doDeclareResourceRequirements();
   }
   
   @Override
   public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
   if (decrement.isEmpty()) {
   return;
   }
   
   totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
   doDeclareResourceRequirements();
   }
   
   
   private void doDeclareResourceRequirements() {
   
   if (slotRequestMaxInterval == null) {
   declareResourceRequirements();
   return;
   }
   
   if (slotRequestMaxIntervalTimeoutFuture != null
   && !slotRequestMaxIntervalTimeoutFuture.isDone()
   && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) {
   slotRequestMaxIntervalTimeoutFuture.cancel(true);
   }
   slotRequestMaxIntervalTimeoutFuture =
   componentMainThreadExecutor.schedule(
   this::declareResourceRequirements,
   slotRequestMaxInterval.toMilliseconds(),
   TimeUnit.MILLISECONDS);
   }
   
   ```
   
   please let me know what's your opinion~ :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-19 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432351722


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   hi, @KarmaGYZ Thank you very much for your comment.
   
   Did you mean that we didn't decide to pass the values of 
`slotRequestMaxInterval`  `slotBatchAllocatable` here based on the 
configuration, but instead directly used hard coding to pass the values ? 
   
   If so (IIUC), The reason for not doing so:
   
   - We want to change the parameter transfer logic uniformly when the default 
scheduler fully supports balanced scheduling.   
   - The current hard coded default values will not break the original logical 
semantics
   
   Please correct me if i'm wrong. 
   Any suggestion is appreciated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-19 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432351722


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   hi, @KarmaGYZ Thank you very much for your comment.
   
   Did you mean that we didn't decide to pass the values of 
`slotRequestMaxInterval`  `slotBatchAllocatable` here based on the 
configuration, but instead directly used hard coding to pass the values ? 
   
   If so (IIUC), The reason for not doing so:
   
   - We want to change the parameter transfer logic uniformly when the default 
scheduler fully supports balanced scheduling. 
   -The current hard coded default values will not break the original logical 
semantics
   
   Please correct me if i'm wrong. 
   Any suggestion is appreciated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-19 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432336723


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
 this.totalResourceRequirements = ResourceCounter.empty();
 this.fulfilledResourceRequirements = ResourceCounter.empty();
 this.slotToRequirementProfileMappings = new HashMap<>();
+this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+this.slotRequestMaxInterval = slotRequestMaxInterval;
 }
 
 @Override
 public void increaseResourceRequirementsBy(ResourceCounter increment) {
-if (increment.isEmpty()) {
+updateResourceRequirementsBy(
+increment,
+() -> totalResourceRequirements = 
totalResourceRequirements.add(increment));

Review Comment:
   @KarmaGYZ Thanks a lot for the comments.
   
   Please let me have a try on explaining it.
   
   My initial intention was to reduce code redundancy, as there are currently 
two calling functions, and the only difference between them is whether to 
reduce resource requests and increase resource requests.
   
   If the extraction of the common part is too broad, then I am very willing to 
improve it. for example:  
   ```
   @Override
   public void increaseResourceRequirementsBy(ResourceCounter increment) {
   if (increment.isEmpty()) {
   return;
   }
   
   totalResourceRequirements = totalResourceRequirements.add(increment);
   updateResourceRequirementsBy();
   }
   
   @Override
   public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
   if (decrement.isEmpty()) {
   return;
   }
   
   totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
   updateResourceRequirementsBy();
   }
   
   
   private void updateResourceRequirementsBy() {
   
   if (slotRequestMaxInterval == null) {
   declareResourceRequirements();
   return;
   }
   
   if (slotRequestMaxIntervalTimeoutFuture != null
   && !slotRequestMaxIntervalTimeoutFuture.isDone()
   && !slotRequestMaxIntervalTimeoutFuture.isCancelled()) {
   slotRequestMaxIntervalTimeoutFuture.cancel(true);
   }
   slotRequestMaxIntervalTimeoutFuture =
   componentMainThreadExecutor.schedule(
   this::declareResourceRequirements,
   slotRequestMaxInterval.toMilliseconds(),
   TimeUnit.MILLISECONDS);
   }
   
   ```
   
   please let me know what's your opinion~ :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-19 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1432179061


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -334,7 +334,9 @@ public void onUnknownDeploymentsOf(
 .createSlotPoolService(
 jid,
 createDeclarativeSlotPoolFactory(
-
jobMasterConfiguration.getConfiguration()));
+
jobMasterConfiguration.getConfiguration()),
+null,

Review Comment:
   Why not set it according to the configuration?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
 this.totalResourceRequirements = ResourceCounter.empty();
 this.fulfilledResourceRequirements = ResourceCounter.empty();
 this.slotToRequirementProfileMappings = new HashMap<>();
+this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+this.slotRequestMaxInterval = slotRequestMaxInterval;
 }
 
 @Override
 public void increaseResourceRequirementsBy(ResourceCounter increment) {
-if (increment.isEmpty()) {
+updateResourceRequirementsBy(
+increment,
+() -> totalResourceRequirements = 
totalResourceRequirements.add(increment));
+}
+
+private void updateResourceRequirementsBy(
+@Nonnull ResourceCounter deltaResourceCount, @Nonnull Runnable 
runnable) {
+if (deltaResourceCount.isEmpty()) {
 return;
 }
-totalResourceRequirements = totalResourceRequirements.add(increment);
 
-declareResourceRequirements();
-}
+runnable.run();
 
-@Override
-public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
-if (decrement.isEmpty()) {
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
 return;
 }
-totalResourceRequirements = 
totalResourceRequirements.subtract(decrement);
 
-declareResourceRequirements();
+if (slotRequestMaxIntervalTimeoutFuture != null
+&& !slotRequestMaxIntervalTimeoutFuture.isDone()
+&& !slotRequestMaxIntervalTimeoutFuture.isCancelled()) {
+slotRequestMaxIntervalTimeoutFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutFuture =
+componentMainThreadExecutor.schedule(
+this::declareResourceRequirements,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+@Override
+public void decreaseResourceRequirementsBy(ResourceCounter decrement) {
+updateResourceRequirementsBy(
+decrement,
+() -> totalResourceRequirements = 
totalResourceRequirements.subtract(decrement));

Review Comment:
   ditto



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java:
##
@@ -336,7 +336,8 @@ public void onUnknownDeploymentsOf(
 createDeclarativeSlotPoolFactory(
 
jobMasterConfiguration.getConfiguration()),
 null,
-getMainThreadExecutor());
+getMainThreadExecutor(),
+false);

Review Comment:
   Why not set slotBatchAllocatable according to the configuration?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -118,26 +129,47 @@ public DefaultDeclarativeSlotPool(
 this.totalResourceRequirements = ResourceCounter.empty();
 this.fulfilledResourceRequirements = ResourceCounter.empty();
 this.slotToRequirementProfileMappings = new HashMap<>();
+this.componentMainThreadExecutor = 
Preconditions.checkNotNull(componentMainThreadExecutor);
+this.slotRequestMaxInterval = slotRequestMaxInterval;
 }
 
 @Override
 public void increaseResourceRequirementsBy(ResourceCounter increment) {
-if (increment.isEmpty()) {
+updateResourceRequirementsBy(
+increment,
+() -> totalResourceRequirements = 
totalResourceRequirements.add(increment));

Review Comment:
   Why not  check whether the ResourceCounter is empty and modify the 
totalResourceRequirements right here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-15 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1427717395


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
 @VisibleForTesting
 void newSlotsAreAvailable(Collection newSlots) {
+if (!globalSlotsViewable) {
+final Collection 
requestSlotMatches =
+requestSlotMatchingStrategy.matchRequestsAndSlots(
+newSlots, pendingRequests.values());
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+return;
+}
+
+receivedSlots.addAll(newSlots);
+if (receivedSlots.size() < pendingRequests.size()) {
+return;
+}
 final Collection 
requestSlotMatches =
 requestSlotMatchingStrategy.matchRequestsAndSlots(
 newSlots, pendingRequests.values());
+if (requestSlotMatches.size() == pendingRequests.size()) {
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+}
+receivedSlots.clear();

Review Comment:
   IIUC, Batched resource requests are sent to RM, and generally, all slots on 
each TM are provided to the `DeclarativeSlotPoolBridge` in a batch, with each 
call triggered `newAvailableSlots` once. So we need to cache it here, right ?  
I'd appreciate it with your  opinion or confirmation .



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1427526869


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -103,12 +106,35 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
 private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
 
+// For batch slots requests
+@Nullable private final Time slotRequestMaxInterval;
+@Nullable private final ComponentMainThreadExecutor 
componentMainThreadExecutor;

Review Comment:
   If we introduce `ComponentMainThreadExecutor`, I prefer it's `@Nonnull` even 
if `slotRequestMaxInterval` is null.
   
   This thread executor may be used for other scenarios, it would be better not 
bound to`slotRequestMaxInterval`.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -103,12 +106,35 @@ public class DefaultDeclarativeSlotPool implements 
DeclarativeSlotPool {
 
 private final RequirementMatcher requirementMatcher = new 
DefaultRequirementMatcher();
 
+// For batch slots requests
+@Nullable private final Time slotRequestMaxInterval;
+@Nullable private final ComponentMainThreadExecutor 
componentMainThreadExecutor;
+@Nullable private ScheduledFuture slotRequestMaxIntervalTimeoutFuture;
+
 public DefaultDeclarativeSlotPool(
 JobID jobId,
 AllocatedSlotPool slotPool,
 Consumer> 
notifyNewResourceRequirements,
 Time idleSlotTimeout,
 Time rpcTimeout) {
+this(
+jobId,
+slotPool,
+notifyNewResourceRequirements,
+idleSlotTimeout,
+rpcTimeout,
+null,
+null);
+}
+
+public DefaultDeclarativeSlotPool(
+JobID jobId,
+AllocatedSlotPool slotPool,
+Consumer> 
notifyNewResourceRequirements,
+Time idleSlotTimeout,
+Time rpcTimeout,
+@Nullable Time slotRequestMaxInterval,
+@Nullable ComponentMainThreadExecutor componentMainThreadExecutor) 
{

Review Comment:
   They have been marked to `@Nullable`, do we still need 2 constructors?
   
   May be one constructor with full parameters is enough.
   
   Note: `BlocklistDeclarativeSlotPool` is same.
   
   
   Also, this commit is a part of FLINK-33388, right? Why doesn't it introduce 
`slotRequestMaxInterval` in this commit? I didn't see any caller pass 
`slotRequestMaxInterval` in this PR.
   
   `commit message` is better to adding the corresponding JIRA id.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1427515808


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {

Review Comment:
   `Precondition.checkNotNull` on two variables would be good enough from my 
side.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1427515551


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);

Review Comment:
   In current approach, we will minimize the rpc calls. The other approach may 
benefit the e2e latency.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1427513961


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
 @VisibleForTesting
 void newSlotsAreAvailable(Collection newSlots) {
+if (!globalSlotsViewable) {
+final Collection 
requestSlotMatches =
+requestSlotMatchingStrategy.matchRequestsAndSlots(
+newSlots, pendingRequests.values());
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+return;
+}
+
+receivedSlots.addAll(newSlots);
+if (receivedSlots.size() < pendingRequests.size()) {
+return;
+}
 final Collection 
requestSlotMatches =
 requestSlotMatchingStrategy.matchRequestsAndSlots(
 newSlots, pendingRequests.values());
+if (requestSlotMatches.size() == pendingRequests.size()) {
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+}
+receivedSlots.clear();

Review Comment:
   The `receivedSlots` will be cleared no matter whether we got enough matching 
slots.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426543430


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);

Review Comment:
   Thank you very much for the comments. 
   Both are acceptable to me.
   But I can't make a final good estimate on its in my limited read. Would you 
mind explaining the advantages of another approach?
   Thank you very much~
   CC @1996fanrui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426543430


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);

Review Comment:
   Thank you very much for the comments. 
   Both are acceptable to me.
   But I can't make a good evaluation on its in my limited read. Would you mind 
explaining the advantages of another approach?
   Thank you very much~
   CC @1996fanrui 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426524511


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
 @VisibleForTesting
 void newSlotsAreAvailable(Collection newSlots) {
+if (!globalSlotsViewable) {
+final Collection 
requestSlotMatches =
+requestSlotMatchingStrategy.matchRequestsAndSlots(
+newSlots, pendingRequests.values());
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+return;
+}
+
+receivedSlots.addAll(newSlots);
+if (receivedSlots.size() < pendingRequests.size()) {
+return;
+}
 final Collection 
requestSlotMatches =
 requestSlotMatchingStrategy.matchRequestsAndSlots(
 newSlots, pendingRequests.values());
+if (requestSlotMatches.size() == pendingRequests.size()) {

Review Comment:
   This is caused by the code version. I will update the latest commitments 
later~ 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426523374


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
 @VisibleForTesting
 void newSlotsAreAvailable(Collection newSlots) {
+if (!globalSlotsViewable) {
+final Collection 
requestSlotMatches =
+requestSlotMatchingStrategy.matchRequestsAndSlots(
+newSlots, pendingRequests.values());
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+return;
+}
+
+receivedSlots.addAll(newSlots);
+if (receivedSlots.size() < pendingRequests.size()) {
+return;
+}
 final Collection 
requestSlotMatches =
 requestSlotMatchingStrategy.matchRequestsAndSlots(
 newSlots, pendingRequests.values());
+if (requestSlotMatches.size() == pendingRequests.size()) {
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+}
+receivedSlots.clear();

Review Comment:
   Thanks @KarmaGYZ  for the comments.
   
   I'm sorry, I didn't quite understand your focus.
   Do you mean that we should clean up after matching, or should we not start 
cleaning up after matching?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426516597


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {

Review Comment:
   nice idea!
   
   How about `When assigning values in the construction method, we do some 
checks and only allow these two parameters to be both empty or not empty at the 
same time` ? Or use `Precondition.checkState/Args` to check it ?
   
   Because we only need this `componentMainThreadExecutor ` when allowing 
requests resources by batch.
   
   Looking forward to better handlings.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-14 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426510597


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {
+return;
+}
 declareResourceRequirements();
+slotRequestMaxIntervalTimeoutCheckFuture = null;

Review Comment:
   Yes, here is an illegal assignment with risk.
   IIUC, we only need to close the current `future` that's cancelable when 
increasing requirements. Otherwise, we can use the result of scheduled task to 
make assignment, please correct me if i'm wrong.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-13 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1413318476


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -110,7 +110,7 @@ public  Optional castInto(Class clazz) {
 }
 
 @Override
-public final void start(

Review Comment:
   There is no need to do this. Just override the onStart.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -142,7 +142,7 @@ protected void assertHasBeenStarted() {
 }
 
 @Override
-public final void close() {

Review Comment:
   ditto. Override the onClose



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-13 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1426214331


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolBridge.java:
##
@@ -200,10 +226,45 @@ protected void onReleaseTaskManager(ResourceCounter 
previouslyFulfilledRequireme
 
 @VisibleForTesting
 void newSlotsAreAvailable(Collection newSlots) {
+if (!globalSlotsViewable) {
+final Collection 
requestSlotMatches =
+requestSlotMatchingStrategy.matchRequestsAndSlots(
+newSlots, pendingRequests.values());
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+return;
+}
+
+receivedSlots.addAll(newSlots);
+if (receivedSlots.size() < pendingRequests.size()) {
+return;
+}
 final Collection 
requestSlotMatches =
 requestSlotMatchingStrategy.matchRequestsAndSlots(
 newSlots, pendingRequests.values());
+if (requestSlotMatches.size() == pendingRequests.size()) {
+reserveMatchedFreeSlots(requestSlotMatches);
+fulfillMatchedSlots(requestSlotMatches);
+}
+receivedSlots.clear();

Review Comment:
   Should we only clear the pending slots after all the requirements are 
fulfilled?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);

Review Comment:
   In current implementation, the resource requirement will be sent if there is 
no further requriement change within the `slotRequestMaxInterval`. Another 
choice is to declare requirement at most once every `slotRequestMaxInterval`. 
TBH I'm not sure which one would be better. But I think that deserves a 
discussion.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DeclarativeSlotPoolService.java:
##
@@ -110,7 +110,7 @@ public  Optional castInto(Class clazz) {
 }
 
 @Override
-public final void start(

Review Comment:
   There is no need to do this. Just override the onStart.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || slotRequestMaxInterval == 
null) {

Review Comment:
   I think we need to assert something went wrong instead of ignore this 
illegal statement.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/DefaultDeclarativeSlotPool.java:
##
@@ -127,7 +155,28 @@ public void increaseResourceRequirementsBy(ResourceCounter 
increment) {
 }
 totalResourceRequirements = totalResourceRequirements.add(increment);
 
+if (slotRequestMaxInterval == null) {
+declareResourceRequirements();
+return;
+}
+
+Preconditions.checkNotNull(componentMainThreadExecutor);
+if (slotRequestMaxIntervalTimeoutCheckFuture != null) {
+slotRequestMaxIntervalTimeoutCheckFuture.cancel(true);
+}
+slotRequestMaxIntervalTimeoutCheckFuture =
+componentMainThreadExecutor.schedule(
+this::checkSlotRequestMaxIntervalTimeout,
+slotRequestMaxInterval.toMilliseconds(),
+TimeUnit.MILLISECONDS);
+}
+
+private void checkSlotRequestMaxIntervalTimeout() {
+if (componentMainThreadExecutor == null || 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1840891716

   Thank you @1996fanrui @KarmaGYZ  very much for the review
   
   I have re evaluated the implementation location of the waiting mechanisms 
based on @KarmaGYZ  offline suggestions.
   
   If two waiting mechanisms are placed in DeclarativeSlotPool, there would be 
preciser & conciser information to maintain.
   
   - The maintenance of reserve slot/resource profiles should be simpler and 
more intuitive.
   
   If we can reach an agreement on It,  I would like to confirm again whether 
we still use `mainThreadExecutor` to complete the timeout waiting mechanism for 
checking? If so, this may require changing the `create `method of 
`DeclarativeSlotPoolFactory`
   
   Please let me know your opinions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1415668887


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(
+new ScheduledThreadPoolExecutor(1),
+slotRequestMaxInterval.toMilliseconds());
+}
+
+@Override
+protected void internalRequestNewAllocatedSlot(PendingRequest 
pendingRequest) {
+pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
+increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), 
false);
+
+timerService.registerTimeout(
+this, slotRequestMaxInterval.getSize(), 
slotRequestMaxInterval.getUnit());
+}
+
+@Override
+void newSlotsAreAvailable(Collection newSlots) {
+receivedNewSlots.addAll(newSlots);
+if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) {
+// TODO: Do the matching logic only for available slots.
+} 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1415663600


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(

Review Comment:
   thanks for the comment.
   SGTM +1.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1415650282


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(
+new ScheduledThreadPoolExecutor(1),

Review Comment:
   +1 for the proposal.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-03 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1413436882


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(
+new ScheduledThreadPoolExecutor(1),
+slotRequestMaxInterval.toMilliseconds());
+}
+
+@Override
+protected void internalRequestNewAllocatedSlot(PendingRequest 
pendingRequest) {
+pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
+increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), 
false);
+
+timerService.registerTimeout(
+this, slotRequestMaxInterval.getSize(), 
slotRequestMaxInterval.getUnit());
+}
+
+@Override
+void newSlotsAreAvailable(Collection newSlots) {
+receivedNewSlots.addAll(newSlots);
+if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) {
+// TODO: Do the matching logic only for available slots.
+} 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-03 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1413436882


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+LoggerFactory.getLogger(GlobalViewDeclarativeSlotPoolBridge.class);
+private final Map increasedResourceRequirements;
+
+private final TimerService 
timerService;
+
+private final @Nonnull Set receivedNewSlots;
+
+private final @Nonnull Map 
preFulfilledFromAvailableSlots;
+private final Time slotRequestMaxInterval;
+
+public GlobalViewDeclarativeSlotPoolBridge(
+JobID jobId,
+DeclarativeSlotPoolFactory declarativeSlotPoolFactory,
+Clock clock,
+Time rpcTimeout,
+Time idleSlotTimeout,
+Time batchSlotTimeout,
+Time slotRequestMaxInterval,
+RequestSlotMatchingStrategy requestSlotMatchingStrategy) {
+super(
+jobId,
+declarativeSlotPoolFactory,
+clock,
+rpcTimeout,
+idleSlotTimeout,
+batchSlotTimeout,
+requestSlotMatchingStrategy);
+this.slotRequestMaxInterval = 
Preconditions.checkNotNull(slotRequestMaxInterval);
+this.receivedNewSlots = new HashSet<>();
+this.preFulfilledFromAvailableSlots = new HashMap<>();
+this.increasedResourceRequirements = new HashMap<>();
+this.timerService =
+new DefaultTimerService<>(
+new ScheduledThreadPoolExecutor(1),
+slotRequestMaxInterval.toMilliseconds());
+}
+
+@Override
+protected void internalRequestNewAllocatedSlot(PendingRequest 
pendingRequest) {
+pendingRequests.put(pendingRequest.getSlotRequestId(), pendingRequest);
+increasedResourceRequirements.put(pendingRequest.getSlotRequestId(), 
false);
+
+timerService.registerTimeout(
+this, slotRequestMaxInterval.getSize(), 
slotRequestMaxInterval.getUnit());
+}
+
+@Override
+void newSlotsAreAvailable(Collection newSlots) {
+receivedNewSlots.addAll(newSlots);
+if (newSlots.isEmpty() && receivedNewSlots.isEmpty()) {
+// TODO: Do the matching logic only for available slots.
+} 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-03 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1413430330


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewPhysicalSlotProviderImpl.java:
##
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+/**
+ * Implementation with global view for {@link PhysicalSlotProvider}. Note: 
It's only used for
+ * streaming mode now.
+ */
+public class GlobalViewPhysicalSlotProviderImpl extends 
PhysicalSlotProviderImpl {
+
+public GlobalViewPhysicalSlotProviderImpl(
+SlotSelectionStrategy slotSelectionStrategy, SlotPool slotPool) {
+super(slotSelectionStrategy, slotPool);
+}
+
+@Override
+protected Map> 
tryAllocateFromAvailable(
+Collection slotRequests) {
+Map> availablePhysicalSlots = 
new HashMap<>();
+for (PhysicalSlotRequest request : slotRequests) {
+availablePhysicalSlots.put(

Review Comment:
   I don't understand why we need this new `tryAllocateFromAvailable`? The old 
`tryAllocateFromAvailable` calls 
`slotSelectionStrategy.selectBestSlotForProfile` related logic, don't we need 
it?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/GlobalViewDeclarativeSlotPoolBridge.java:
##
@@ -0,0 +1,261 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster.slotpool;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
+import org.apache.flink.runtime.jobmaster.JobMasterId;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.taskexecutor.slot.DefaultTimerService;
+import org.apache.flink.runtime.taskexecutor.slot.TimeoutListener;
+import org.apache.flink.runtime.taskexecutor.slot.TimerService;
+import org.apache.flink.runtime.util.ResourceCounter;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.clock.Clock;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+
+/**
+ * {@link SlotPool} implementation which could use the {@link 
GlobalViewDeclarativeSlotPoolBridge}
+ * to allocate slots in a global view. Note: It's only used for streaming mode 
now.
+ */
+public class GlobalViewDeclarativeSlotPoolBridge extends 
DeclarativeSlotPoolBridge
+implements TimeoutListener {
+
+public static final Logger LOG =
+

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-12-03 Thread via GitHub


RocMarshal commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1837758141

   The waiting mechanism is ready for the review.
   Would you @KarmaGYZ @1996fanrui  help take a look if you were in free time? 
Thank you very much~
   And the verification part about the test would be refactored after external 
junit5 migrated.
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-26 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405582679


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest extends 
AbstractSlotSharingStrategyTest {

Review Comment:
   Why did we delete the other two tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-26 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405582679


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest extends 
AbstractSlotSharingStrategyTest {

Review Comment:
   Why did we delete the other two tests?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-26 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405313903


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest {

Review Comment:
   > In addition to extracting common code, it is also necessary to consider 
the upgrade issue of Junit4
   
   In order to simply the review of https://github.com/apache/flink/pull/23635 
, I submit an hotfix [PR](https://github.com/apache/flink/pull/23806) to 
upgrade the `LocalInputPreferredSlotSharingStrategyTest` to junit5.
   
   > If there are common parts, it may add some complexity to the review 
process and update process.
   
   If it has some common part, we just review the different part. It will 
simply the review process. And it doesn't introduce any duplicated code.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-26 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405352714


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest {

Review Comment:
   `LocalInputPreferredSlotSharingStrategyTest` is upgraded to junit5 in the 
master branch, please rebase master and go ahead in your free time, thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405310873


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionSlotSharingGroups(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
+}
+}
+
+/** The interface to compute the fittest slot index. */
+interface SlotIndexSupplier {

Review Comment:
   ```suggestion
   private interface SlotIndexSupplier {
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


RocMarshal commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1826330445

   Hi, @KarmaGYZ @1996fanrui Thank you very much for your patient review 
comments. I updated it based on your comments.
   PTAL in your free time,Have a nice weekend~


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405028622


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest {

Review Comment:
   It sounds good to me~
   Would you mind putting this part of the optimization in a Independent ticket 
after merging this feature ?
   
   -In addition to extracting common code, it is also necessary to consider the 
upgrade issue of Junit4
   -If there are common parts, it may add some complexity to the review process 
and update process.
   
   Anyway, I think both options are good. Please let me know what's your 
opinion. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405002911


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest {
+
+@Test
+void testCoLocationConstraintIsRespected() {
+TestingSchedulingTopology topology = new TestingSchedulingTopology();
+List>> 
jobVertexInfos =
+new ArrayList<>();
+SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl();
+CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl();
+List mockedJobVertices =
+getMockedJobVertices(slotSharingGroup1, coLocationGroup1, 
coLocationGroup2);
+setupCase(mockedJobVertices, topology, jobVertexInfos);
+
+final SlotSharingStrategy strategy =
+new TaskBalancedPreferredSlotSharingStrategy(
+topology,
+Sets.newHashSet(slotSharingGroup1),
+Sets.newHashSet(coLocationGroup1, coLocationGroup2));
+List executionVertices1 = 
jobVertexInfos.get(1).f1;
+List executionVertices2 = 
jobVertexInfos.get(2).f1;
+TestingSchedulingExecutionVertex executionVertexOfJv0 = 
jobVertexInfos.get(0).f1.get(0);
+ExecutionSlotSharingGroup 
executionSlotSharingGroupOfExecutionVertexOfJv0 =
+
strategy.getExecutionSlotSharingGroup(executionVertexOfJv0.getId());
+
+assertThat(executionVertices1).hasSameSizeAs(executionVertices2);
+for (int i = 0; i < executionVertices1.size(); i++) {
+ExecutionSlotSharingGroup executionSlotSharingGroup =
+
strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId());
+assertThat(executionSlotSharingGroup)
+
.isNotEqualTo(executionSlotSharingGroupOfExecutionVertexOfJv0);
+assertThat(executionSlotSharingGroup)
+.isEqualTo(
+strategy.getExecutionSlotSharingGroup(
+executionVertices2.get(i).getId()));
+}
+
+List executionVertices3 = 
jobVertexInfos.get(3).f1;
+List executionVertices4 = 
jobVertexInfos.get(4).f1;
+for (int i = 0; i < executionVertices1.size(); i++) {
+
assertThat(strategy.getExecutionSlotSharingGroup(executionVertices3.get(i).getId()))
+.isEqualTo(
+strategy.getExecutionSlotSharingGroup(
+executionVertices4.get(i).getId()));
+}
+}
+
+@Nonnull
+private static ArrayList 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405002804


##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroupImpl;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import 
org.apache.flink.runtime.scheduler.strategy.TestingSchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.TestingSchedulingTopology;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.flink.shaded.guava31.com.google.common.collect.Lists;
+import org.apache.flink.shaded.guava31.com.google.common.collect.Sets;
+
+import org.assertj.core.data.Offset;
+import org.junit.jupiter.api.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link TaskBalancedPreferredSlotSharingStrategy}. */
+class TaskBalancedPreferredSlotSharingStrategyTest {
+
+@Test
+void testCoLocationConstraintIsRespected() {
+TestingSchedulingTopology topology = new TestingSchedulingTopology();
+List>> 
jobVertexInfos =
+new ArrayList<>();
+SlotSharingGroup slotSharingGroup1 = new SlotSharingGroup();
+CoLocationGroup coLocationGroup1 = new CoLocationGroupImpl();
+CoLocationGroup coLocationGroup2 = new CoLocationGroupImpl();
+List mockedJobVertices =
+getMockedJobVertices(slotSharingGroup1, coLocationGroup1, 
coLocationGroup2);
+setupCase(mockedJobVertices, topology, jobVertexInfos);
+
+final SlotSharingStrategy strategy =
+new TaskBalancedPreferredSlotSharingStrategy(
+topology,
+Sets.newHashSet(slotSharingGroup1),
+Sets.newHashSet(coLocationGroup1, coLocationGroup2));
+List executionVertices1 = 
jobVertexInfos.get(1).f1;
+List executionVertices2 = 
jobVertexInfos.get(2).f1;
+TestingSchedulingExecutionVertex executionVertexOfJv0 = 
jobVertexInfos.get(0).f1.get(0);
+ExecutionSlotSharingGroup 
executionSlotSharingGroupOfExecutionVertexOfJv0 =
+
strategy.getExecutionSlotSharingGroup(executionVertexOfJv0.getId());
+
+assertThat(executionVertices1).hasSameSizeAs(executionVertices2);
+for (int i = 0; i < executionVertices1.size(); i++) {
+ExecutionSlotSharingGroup executionSlotSharingGroup =
+
strategy.getExecutionSlotSharingGroup(executionVertices1.get(i).getId());
+assertThat(executionSlotSharingGroup)
+
.isNotEqualTo(executionSlotSharingGroupOfExecutionVertexOfJv0);
+assertThat(executionSlotSharingGroup)
+.isEqualTo(
+strategy.getExecutionSlotSharingGroup(
+executionVertices2.get(i).getId()));
+}
+
+List executionVertices3 = 
jobVertexInfos.get(3).f1;
+List executionVertices4 = 
jobVertexInfos.get(4).f1;
+for (int i = 0; i < executionVertices1.size(); i++) {

Review Comment:
   nice catch



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405002722


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionSlotSharingGroups(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+private final Map>
+paralleledExecutionSlotSharingGroupsMap;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotSharingGroupIndexMap;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map
+constraintToExecutionSlotSharingGroupMap;
+
+private TaskBalancedExecutionSlotSharingGroupBuilder(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+this.topology = checkNotNull(topology);
+
+this.coLocationGroupMap = new HashMap<>();
+for (CoLocationGroup coLocationGroup : coLocationGroups) {
+for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+coLocationGroupMap.put(jobVertexId, coLocationGroup);
+}
+}
+
+ 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-25 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1405002564


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionSlotSharingGroups(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+private final Map>
+paralleledExecutionSlotSharingGroupsMap;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotSharingGroupIndexMap;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map
+constraintToExecutionSlotSharingGroupMap;
+
+private TaskBalancedExecutionSlotSharingGroupBuilder(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+this.topology = checkNotNull(topology);
+
+this.coLocationGroupMap = new HashMap<>();
+for (CoLocationGroup coLocationGroup : coLocationGroups) {
+for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+coLocationGroupMap.put(jobVertexId, coLocationGroup);
+}
+}
+
+ 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-20 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1400131195


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionSlotSharingGroups(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+private final Map>
+paralleledExecutionSlotSharingGroupsMap;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotSharingGroupIndexMap;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map
+constraintToExecutionSlotSharingGroupMap;
+
+private TaskBalancedExecutionSlotSharingGroupBuilder(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+this.topology = checkNotNull(topology);
+
+this.coLocationGroupMap = new HashMap<>();
+for (CoLocationGroup coLocationGroup : coLocationGroups) {
+for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+coLocationGroupMap.put(jobVertexId, coLocationGroup);
+}
+}
+
+ 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-20 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1398939301


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,361 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionSlotSharingGroups(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, slotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/** Record the {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s. */
+private final Map>
+paralleledExecutionSlotSharingGroupsMap;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotSharingGroupIndexMap;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map
+constraintToExecutionSlotSharingGroupMap;
+
+private TaskBalancedExecutionSlotSharingGroupBuilder(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+this.topology = checkNotNull(topology);
+
+this.coLocationGroupMap = new HashMap<>();
+for (CoLocationGroup coLocationGroup : coLocationGroups) {
+for (JobVertexID jobVertexId : coLocationGroup.getVertexIds()) 
{
+coLocationGroupMap.put(jobVertexId, coLocationGroup);
+}
+}
+
+   

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-12 Thread via GitHub


1996fanrui commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1807589290

   Hi @KarmaGYZ , thanks for your hard review!
   
   > I think this PR contains two components. First would be a supplement of 
[FLINK-33448](https://issues.apache.org/jira/browse/FLINK-33448). Second is 
part of the TASKS strategy. I think we may split it into two seperate commit. 
   
   Split it makes sense, it's clearer.
   
   > It would be better to include 
[FLINK-33388](https://issues.apache.org/jira/browse/FLINK-33388) and introduce 
TASKS strategy.
   
   Would you mind if we keep them into multiple PRs? I'm afraid one PR has a 
lot of commits and changes is hard to review. Of course, only one PR is 
acceptable for me.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-12 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390238967


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##
@@ -295,6 +294,19 @@ public static SlotManagerConfiguration fromConfiguration(
 redundantTaskManagerNum);
 }
 
+@VisibleForTesting
+public static SlotMatchingStrategy 
getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) {

Review Comment:
   ```suggestion
  static SlotMatchingStrategy 
getSlotMatchingStrategy(TaskManagerLoadBalanceMode mode) {
   ```
   
   Is the default enough here? I see all caller are the same package.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionToESsgMap(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set logicalSlotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, logicalSlotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/**
+ * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+ */
+private final Map> 
ssgToExecutionSSGs;
+
+/**
+ * Record the next round-robin {@link ExecutionSlotSharingGroup} index 
for {@link
+ * SlotSharingGroup}s.
+ */
+private final Map slotRoundRobinIndex;
+
+private final Map
+executionSlotSharingGroupMap;
+
+private final Map coLocationGroupMap;
+
+private final Map 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-12 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390593913


##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##
@@ -76,6 +76,6 @@ public ResourceProfile getResourceProfile() {
 
 @Override
 public String toString() {
-return "SlotSharingGroup " + this.ids.toString();
+return "SlotSharingGroup{" + "ids=" + ids + ", resourceProfile=" + 
resourceProfile + '}';

Review Comment:
   Should belong to a seperate hotfix commit.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +35,26 @@ class ExecutionSlotSharingGroup {
 
 private final Set executionVertexIds;
 
-private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+@Nonnull private final SlotSharingGroup slotSharingGroup;

Review Comment:
   IIUC, we change it to `SlotSharingGroup` only for test? I don't think that's 
a good practice.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put evenly in each 
ExecutionSlotSharingGroup. Co-location
+ * constraints will be respected.
+ */
+class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy {
+
+public static final Logger LOG =
+
LoggerFactory.getLogger(TaskBalancedPreferredSlotSharingStrategy.class);
+
+TaskBalancedPreferredSlotSharingStrategy(
+final SchedulingTopology topology,
+final Set slotSharingGroups,
+final Set coLocationGroups) {
+super(topology, slotSharingGroups, coLocationGroups);
+}
+
+@Override
+protected Map 
computeExecutionToESsgMap(
+SchedulingTopology schedulingTopology) {
+return new TaskBalancedExecutionSlotSharingGroupBuilder(
+schedulingTopology, this.logicalSlotSharingGroups, 
this.coLocationGroups)
+.build();
+}
+
+static class Factory implements SlotSharingStrategy.Factory {
+
+public TaskBalancedPreferredSlotSharingStrategy create(
+final SchedulingTopology topology,
+final Set logicalSlotSharingGroups,
+final Set coLocationGroups) {
+
+return new TaskBalancedPreferredSlotSharingStrategy(
+topology, logicalSlotSharingGroups, coLocationGroups);
+}
+}
+
+/** SlotSharingGroupBuilder class for balanced scheduling strategy. */
+private static class TaskBalancedExecutionSlotSharingGroupBuilder {
+
+private final SchedulingTopology topology;
+
+private final Map slotSharingGroupMap;
+
+/**
+ * Record the number of {@link ExecutionSlotSharingGroup}s for {@link 
SlotSharingGroup}s.
+ */
+private final Map> 
ssgToExecutionSSGs;
+
+/**
+ * Record 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-11 Thread via GitHub


RocMarshal commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1806832620

   Thank you @KarmaGYZ @1996fanrui  very much for your comments. and I updated 
the PR  based on your comments.
   Have a great weekend~ :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-11 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r139022


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   It's fine for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-11 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390231615


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   yes, that's right.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-10 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390096801


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   It's fine for me. 
   
   If so, this PR just introduces `TaskBalancedPreferredSlotSharingStrategy` 
related code and doesn't use them, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-10 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1390096271


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +711,60 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+@Documentation.Section({
+Documentation.Sections.EXPERT_SCHEDULING,
+Documentation.Sections.ALL_TASK_MANAGER
+})
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =
+ConfigOptions.key("taskmanager.load-balance.mode")
+.enumType(TaskManagerLoadBalanceMode.class)
+.defaultValue(TaskManagerLoadBalanceMode.NONE)
+.withDescription(
+Description.builder()
+.text(
+"Mode for the load-balance 
allocation strategy across all available %s.",
+code("TaskManagers"))
+.list(
+text(
+"The %s mode tries to 
spread out the slots evenly across all available %s.",
+
code(TaskManagerLoadBalanceMode.SLOTS.name()),
+code("TaskManagers")),
+text(
+"The %s mode tries to 
ensure that the number of tasks is relative balanced across all available %s.",

Review Comment:
   hi, @1996fanrui 
   Thanks for your review.
   
   Based on the results of the 
[discussion](https://github.com/apache/flink/pull/23635#discussion_r1383243917).
   The `TASKS` enumeration and its description will be temporarily removed, and 
after the entire function is completed, they will be reintroduced(with 
addressed your comments) to ensure consistency between the function and the 
document.
   WDYT?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-09 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1388936955


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   I'm lean to introduce `TASKS` after the functionality is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-09 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1388936955


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   I lean to introduce `TASKS` after the functionality is complete.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-09 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1385855309


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
 private final Set executionVertexIds;
 
-private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+private @Nonnull SlotSharingGroup slotSharingGroup;
 
+/** @deprecated Only for test classes. */
+@Deprecated

Review Comment:
   The constructor should be marked as `VisibleForTesting` instead of Depreated 
if it's only for test.



##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +711,60 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+@Documentation.Section({
+Documentation.Sections.EXPERT_SCHEDULING,
+Documentation.Sections.ALL_TASK_MANAGER
+})
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =
+ConfigOptions.key("taskmanager.load-balance.mode")
+.enumType(TaskManagerLoadBalanceMode.class)
+.defaultValue(TaskManagerLoadBalanceMode.NONE)
+.withDescription(
+Description.builder()
+.text(
+"Mode for the load-balance 
allocation strategy across all available %s.",
+code("TaskManagers"))
+.list(
+text(
+"The %s mode tries to 
spread out the slots evenly across all available %s.",
+
code(TaskManagerLoadBalanceMode.SLOTS.name()),
+code("TaskManagers")),
+text(
+"The %s mode tries to 
ensure that the number of tasks is relative balanced across all available %s.",

Review Comment:
   ```suggestion
   "The %s mode tries to 
spread out the tasks evenly across all available %s.",
   ```
   
   It's better to keep same with SLOTS.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java:
##
@@ -84,4 +98,23 @@ public ExecutionSlotAllocator createInstance(final 
ExecutionSlotAllocationContex
 allocationTimeout,
 context::getResourceProfile);
 }
+
+private static SlotSharingStrategy.Factory getSlotSharingStrategyFactory(
+@Nonnull Configuration configuration) {
+
+TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
+if (taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.TASKS) {
+return new TaskBalancedPreferredSlotSharingStrategy.Factory();
+}
+if (taskManagerLoadBalanceMode != TaskManagerLoadBalanceMode.NONE
+&& taskManagerLoadBalanceMode != 
TaskManagerLoadBalanceMode.SLOTS) {
+LOG.warn(
+"The value '{}' of '{}' isn't supported now, it will 
rollback to default strategy '{}'.",
+taskManagerLoadBalanceMode,
+TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.key(),
+
TaskManagerOptions.TASK_MANAGER_LOAD_BALANCE_MODE.defaultValue());
+}
+return new LocalInputPreferredSlotSharingStrategy.Factory();

Review Comment:
   ```suggestion
   if (taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.NONE
   || taskManagerLoadBalanceMode == 
TaskManagerLoadBalanceMode.SLOTS) {
   return new LocalInputPreferredSlotSharingStrategy.Factory();
   }
   throw new UnsupportedXXXException("Unknown  
TaskManagerLoadBalanceMode");
   ```
   
   How about throw exception instead log.warn? If other developers add new enum 
in the future, the `exception` can let him know and he should change the logic 
to support the new enum properly.



##
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/TaskBalancedPreferredSlotSharingStrategyTest.java:
##
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-09 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1387973762


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   Hi, @KarmaGYZ Thanks a lot for the review.
   
   This part is intended to be addressed in FLINK-33388.
   I have to admit that in the current PR, perhaps the balanced allocation 
strategy option `TASKS `for Slot latitude has been turned on too early, because 
there are two issues:
   
   - We have opened the functions corresponding to the incomplete `TASKS` 
option. If we enable the `TASKS` option, there will be a situation where the 
allocation of Slot latitude is balanced, but the allocation of Slot to TM is 
not balanced, although this has some improvements compared to the old strategy.
   - If the entire FLIP process progresses slowly, it may affect the release of 
the version
   
   Based on the above questions, can one of the following ways be considered to 
advance the current work
   
   - Ensure that the functionality is complete. Currently, only relevant 
functional codes have been added to the PR, we can disable the entry with the 
configuration item TASKS temporarily. It will be opened again when FLINK-33388 
is pushed forward.
   - Alternatively, the work content in FLINK-33388 can also be included in the 
current PR. This may increase the workload of the reviewer due to too many 
changes.
   - Alternatively, keep the current state and go a head by raw plan.
   
   I'm willing to hear any opinions and suggestions about this.
   Thank you~  :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-09 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1387973762


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   Hi, @KarmaGYZ Thanks a lot for the review.
   
   This part is intended to be addressed in FLINK-33388.
   I have to admit that in the current PR, perhaps the balanced allocation 
strategy option `TASKS `for Slot latitude has been turned on too early, because 
there are two issues:
   
   - We have opened the functions corresponding to the incomplete `TASKS` 
option. If we enable the `TASKS` option, there will be a situation where the 
allocation of Slot latitude is balanced, but the allocation of Slot to TM is 
not balanced, although this has some improvements compared to the old strategy.
   - If the entire FLIP process progresses slowly, it may affect the release of 
the version
   
   Based on the above questions, can one of the following ways be considered to 
advance the current work
   
   - Ensure that the functionality is complete. Currently, only relevant 
functional codes have been added to the PR, we can disable the entry with the 
configuration item TASKS temporarily. It will be opened again when FLINK-33388 
is pushed forward.
   - Alternatively, the work content in FLINK-33388 can also be included in the 
current PR. This may increase the workload of the reviewer due to too many 
changes.
   
   I'm willing to hear any opinions and suggestions about this.
   Thank you~  :)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-08 Thread via GitHub


KarmaGYZ commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1383243917


##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/DefaultResourceAllocationStrategy.java:
##
@@ -95,7 +96,7 @@ public DefaultResourceAllocationStrategy(
 SlotManagerUtils.generateDefaultSlotResourceProfile(
 totalResourceProfile, numSlotsPerWorker);
 this.availableResourceMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   What about tasks?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +34,36 @@ class ExecutionSlotSharingGroup {
 
 private final Set executionVertexIds;
 
-private ResourceProfile resourceProfile = ResourceProfile.UNKNOWN;
+private @Nonnull SlotSharingGroup slotSharingGroup;
 
+/** @deprecated Only for test classes. */
+@Deprecated

Review Comment:
   `@VisibleForTesting`



##
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java:
##
@@ -260,10 +261,10 @@ public static SlotManagerConfiguration fromConfiguration(
 configuration.getBoolean(
 
ResourceManagerOptions.TASK_MANAGER_RELEASE_WHEN_RESULT_CONSUMED);
 
-boolean evenlySpreadOutSlots =
-
configuration.getBoolean(ClusterOptions.EVENLY_SPREAD_OUT_SLOTS_STRATEGY);
+TaskManagerLoadBalanceMode taskManagerLoadBalanceMode =
+
TaskManagerLoadBalanceMode.loadFromConfiguration(configuration);
 final SlotMatchingStrategy slotMatchingStrategy =
-evenlySpreadOutSlots
+taskManagerLoadBalanceMode == TaskManagerLoadBalanceMode.SLOTS

Review Comment:
   ditto.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##
@@ -42,69 +43,23 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
 implements SlotSharingStrategy, SchedulingTopologyListener {

Review Comment:
   No need to implement them again.



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LocalInputPreferredSlotSharingStrategy.java:
##
@@ -42,69 +43,23 @@
 import java.util.Set;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
-import static org.apache.flink.util.Preconditions.checkState;
 
 /**
  * This strategy tries to reduce remote data exchanges. Execution vertices, 
which are connected and
  * belong to the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup.
  * Co-location constraints will be respected.
  */
-class LocalInputPreferredSlotSharingStrategy
+class LocalInputPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
 implements SlotSharingStrategy, SchedulingTopologyListener {
 
-private final Map 
executionSlotSharingGroupMap;
-
-private final Set logicalSlotSharingGroups;
-
-private final Set coLocationGroups;
+public static final Logger LOG =
+
LoggerFactory.getLogger(LocalInputPreferredSlotSharingStrategy.class);

Review Comment:
   Where do we use it?



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements 
java.io.Serializable {
 
 // 

 
+public SlotSharingGroup() {}
+
+public SlotSharingGroup(ResourceProfile resourceProfile) {

Review Comment:
   Seems we don't need to do that. Just instantiate a SlotSharingGroup and set 
the resource profile.



##
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/SlotSharingGroup.java:
##
@@ -46,6 +47,12 @@ public class SlotSharingGroup implements 
java.io.Serializable {
 
 // 

 
+public SlotSharingGroup() {}
+
+public SlotSharingGroup(ResourceProfile resourceProfile) {

Review Comment:
   Only visible for testing?



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionSlotSharingGroup.java:
##
@@ -31,22 +34,36 @@ class 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-05 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1382851909


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SlotSharingExecutionSlotAllocatorFactory.java:
##
@@ -84,4 +96,25 @@ public ExecutionSlotAllocator createInstance(final 
ExecutionSlotAllocationContex
 allocationTimeout,
 context::getResourceProfile);
 }
+
+private static SlotSharingStrategy.Factory getSlotSharingStrategyFactory(
+@Nullable Configuration configuration) {

Review Comment:
   I checked the parameter passing link. There will be no null values of 
`configuration`, I will remove this null-configuration judgment.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-03 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1381232496


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =

Review Comment:
   Please go ahead for the first JIRA, please cc me after the PR is ready, 
thanks~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-03 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1381224693


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =

Review Comment:
   hi, @1996fanrui Thanks for the idea. There's nothing better!
   
   >After this FLIP is merged, if a job with cluster.evenly-spread-out-slots: 
true, and didn't set the taskmanager.load-
   >balance.mode. Should we change the default value from None to Slots?
   
   Yes,  I'd  prefer to be compatible with old configuration item.
   
   - This can provide users with a smooth over experience
   - Removing the old configuration item requires a process for evolution.  For 
example, after one or two editions, we will completely discard the old 
configuration item.
   
   I'd create this JIRA ticket and advance the related work if there's nothing 
ambiguous.
   Looking forward to your reply~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-03 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1381224693


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =

Review Comment:
   hi, @1996fanrui Thanks for the idea. There's nothing better!
   
   >After this FLIP is merged, if a job with cluster.evenly-spread-out-slots: 
true, and didn't set the taskmanager.load-
   >balance.mode. Should we change the default value from None to Slots?
   
   Yes,  I'd  prefer to be compatible with old configuration item.
   
   - This can provide users with a smooth over experience
   - Removing the old configuration item requires a process for evolution.  For 
example, after one or two editions, we will completely discard the old 
configuration item.
   
   I'd create this JIRA ticket and advance the related work if there's nothing 
ambiguous.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-11-03 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1381212518


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =

Review Comment:
   Thanks @RocMarshal  for the analysis!
   
   How about creating a JIRA to add `taskmanager.load-balance.mode` and 
deprecate `cluster.evenly-spread-out-slots` first?
   - It don't introduce any feature, just using the new option and deprecate 
old option.
   - It should be the first JIRA, and `TaskManagerLoadBalanceMode` just have 2 
enums: `None` and `Slots`.
   - In the second JIRA(maybe this PR), you can start your feature: adding the 
`Tasks`, and supports it.
   
   I think this process is clearer and easier than the process you proposed, 
and it don't effect users side.
   
   --
   
   Besides how to implement this, I have a question about compatibility of 
`taskmanager.load-balance.mode` and `cluster.evenly-spread-out-slots`.
   
   As I understand :
   - `taskmanager.load-balance.mode : Slots`  == 
`cluster.evenly-spread-out-slots: true`
   - `taskmanager.load-balance.mode : None`  == 
`cluster.evenly-spread-out-slots: false` (By default.)
   
   After this FLIP is merged, if a job with `cluster.evenly-spread-out-slots: 
true`, and didn't set the `taskmanager.load-balance.mode`. Should we change the 
default value from `None` to `Slots`?
   
   - If yes, we can support old users directly.
   - If no, users must set `taskmanager.load-balance.mode : Slots` manually if 
they want to the `evenly-spread-out-slots` feature.
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-10-31 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1378358266


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =

Review Comment:
   hi, @1996fanrui Thank you very much for the precise comments !
   
   I'd prefer the second and after a little adjustment as follows:
   1.  Add `@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)`
   2. Temporarily comment out 'SLOT' mode in the `TaskManagerLoadBalanceMode`
   3.  Add the description about the configuration item like follows:
 ```
  The load balance mode of taskmanager when processing scheduling 
tasks.
   
  'NONE' means that the scheduler does not consider any dimensional 
balance when scheduling tasks.
  'TASKS' means that the scheduler prioritizes ensuring that the 
number of tasks on each TM is balanced when scheduling tasks.
   
  Please view FLIP-370 for more details.
 ```
   4. After all PRs merged, we'll add the 'SLOTS' mode back, supplement the 
description and deprecate slot-spread-out. the added description would like 
follows:
 ```
 'SLOTS' indicates that the scheduler prioritizes and balances the 
use of each TM's slot when scheduling tasks.
 ```
   
   Please let me know what's your opinion~



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-10-31 Thread via GitHub


RocMarshal commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1378352765


##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup. Co-location
+ * constraints are ignored at present.
+ */
+class BalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy

Review Comment:
   SGTM~



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup. Co-location
+ * constraints are ignored at present.

Review Comment:
   I'll update the header comments.



##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in 

Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-10-31 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1378332744


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =

Review Comment:
   It likes other options, it's better to add some comments, and 
`@Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER)`?
   
   Or should we add `@Documentation.Section` and deprecate `slot-spread-out`  
after all PRs is merged? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-10-31 Thread via GitHub


1996fanrui commented on code in PR #23635:
URL: https://github.com/apache/flink/pull/23635#discussion_r1378331133


##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =
+ConfigOptions.key("taskmanager.load-balance.mode")
+.enumType(TaskManagerLoadBalanceMode.class)
+.defaultValue(TaskManagerLoadBalanceMode.NONE)
+.withDescription(
+"The load balance mode of taskmanager when 
processing scheduling tasks.");

Review Comment:
   It's better to describe these values in detail, it's useful for users to 
understand this option.



##
flink-core/src/main/java/org/apache/flink/configuration/TaskManagerOptions.java:
##
@@ -708,6 +708,20 @@ public class TaskManagerOptions {
 "Time we wait for the timers in milliseconds to 
finish all pending timer threads"
 + " when the stream task is cancelled.");
 
+public static final ConfigOption 
TASK_MANAGER_LOAD_BALANCE_MODE =
+ConfigOptions.key("taskmanager.load-balance.mode")
+.enumType(TaskManagerLoadBalanceMode.class)
+.defaultValue(TaskManagerLoadBalanceMode.NONE)
+.withDescription(
+"The load balance mode of taskmanager when 
processing scheduling tasks.");
+
+/** Type of taskmanager.load-balance.mode. */

Review Comment:
   ```suggestion
   /** Type of {@link #TASK_MANAGER_LOAD_BALANCE_MODE}. */
   ```



##
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/BalancedPreferredSlotSharingStrategy.java:
##
@@ -0,0 +1,275 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingExecutionVertex;
+import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.AbstractMap;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * This strategy tries to get a balanced tasks scheduling. Execution vertices, 
which are belong to
+ * the same SlotSharingGroup, tend to be put in the same 
ExecutionSlotSharingGroup. Co-location
+ * constraints are ignored at present.
+ */
+class BalancedPreferredSlotSharingStrategy extends AbstractSlotSharingStrategy

Review Comment:
   ```suggestion
   class TaskBalancedPreferredSlotSharingStrategy extends 
AbstractSlotSharingStrategy
   ```
   
   How about `TaskBalancedPreferredSlotSharingStrategy`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-10-31 Thread via GitHub


flinkbot commented on PR #23635:
URL: https://github.com/apache/flink/pull/23635#issuecomment-1787488771

   
   ## CI report:
   
   * 0816a1779cbf97c1e3526e18e70d3d2e09817a05 UNKNOWN
   
   
   Bot commands
 The @flinkbot bot supports the following commands:
   
- `@flinkbot run azure` re-run the last Azure build
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[PR] [FLINK-33386][runtime] Support tasks balancing at slot level for Default Scheduler [flink]

2023-10-31 Thread via GitHub


RocMarshal opened a new pull request, #23635:
URL: https://github.com/apache/flink/pull/23635

   
   
   ## What is the purpose of the change
   
   - Support tasks balancing at slot level for Default Scheduler
   
   
   ## Brief change log
   
 - *Introduce BalancedPreferredSlotSharingStrategy to support tasks 
balancing at slot level.*
 - *Expose the configuration item to enable tasks balancing at slot level 
for Default Scheduler.*
   
   
   ## Verifying this change
   
   
   This change added tests and can be verified as follows:
   
 - 
*`org.apache.flink.runtime.scheduler.BalancedPreferredSlotSharingStrategyTest`*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
**don't know**)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (yes / **no** / don't 
know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (**yes** / no)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org