[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-08 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r405329343
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   Looking more into this, I'm not sure whether returning the container size is 
the right approach.
   
   The problem is, the container resource contains two parts that are decided 
by different components.
   - Those contained in `WorkerResourceSpec` are decided by `SlotManager`: cpu, 
task heap, task off-heap, network, managed.
   - Those contained in `TaskExecutorProcessSpec` but not in 
`WorkerResourceSpec` are decided by `ResourceManager`: framework heap, 
framework off-heap, jvm metaspace, jvm overhead.
   
   If we return the size of the entire started container to SM, it does not 
help SM to decide whether there are resources wasted, because SM is not aware 
of how many resources in addition to `WorkerResourceSpec` that the framework 
and jvm need.
   
   An alternative is to exclude the framework and jvm resources and the 
remaining to SM. The problem is that, the excluded part is not constant and 
might be dependent on `WorkerResourceSpec`. To be specific, jvm overhead might 
be configured as a fraction of total process memory size. That means increasing 
the total size of `WorkerResourceSpec` may also lead to the increasing of jvm 
overhead.
   
   I think what should be returned by this API really depends on how we want to 
solve the resource wasting issue. TBH, I'm not sure when would we have a 
`SlotManager` implementation that deals with the Yarn resource wasting problem. 
I would suggest to create a follow-up ticket and leave it to when we do have 
such a SM implementation. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r404473798
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   True, it is possible that RM won't be able to start a container for given 
`WorkerResourceSpec`.
   
   I think if the requested resource, to be specific the 
`TaskExecutorProcessSpec` derived from `WorkerResourceSpec`, is smaller than 
the Yarn min-allocation, we can simply throw a warning for that. Same for when 
requested resource is not an integer multiple of min-allocation. The critical 
part is if the requested resource is larger than the max-allocation, and we 
should check return false in such cases.
   
   I think these checks should be performed by the RM implementations. I'll add 
the checks for Yarn in #11353. For K8s, I think there are no such 
min/max-allocation limits, but I'll double-check on that.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403823418
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
+   final TaskExecutorProcessSpec taskExecutorProcessSpec = 
TaskExecutorProcessUtils
+   .newProcessSpecBuilder(configuration)
+   .withCpuCores(getDefaultCpus(configuration))
+   .build();
+   return 
WorkerResourceSpec.fromTaskExecutorProcessSpec(taskExecutorProcessSpec);
+   }
+
+   protected abstract CPUResource getDefaultCpus(Configuration 
configuration);
 
 Review comment:
   If the deriving of cpu cores and default worker spec is too detailed for 
`ActiveResourceManager`, I would suggest to add an abstract method 
`getDefaultWorkerSpecFactory` to return different `WorkerResourceSpecFactory` 
for different `ActiveResourceManager` implementations.
   
   My concern for passing `WorkerResourceSpecFactory` or `WorkerResourceSpec` 
to `ActiveResourceManager` is that, neither the constructor nor the 
`createResourceManager` method seems to be a good place to pass them in IMO.
   - For the constructor, `ActiveResourceManager` is abstract and its 
implementations are either singleton (K8s/Yarn) or created by cluster entry 
point (Mesos). I think it is too detail for the cluster entry point to pass in 
different `WorkerResourceSpecFactory`/`WorkerResourceSpec` to the resource 
manager factory, not to mention breaking singleton properties on K8s/Yarn.
   - For `createResourceManager`, it is called by 
`DefaultDispatcherResourceManagerComponentFactory`, which is not aware of the 
different deployments either.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403806981
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/ResourceActions.java
 ##
 @@ -42,9 +40,9 @@
 * Requests to allocate a resource with the given {@link 
ResourceProfile}.
 *
 * @param resourceProfile for the to be allocated resource
-* @return Collection of {@link ResourceProfile} describing the 
allocated slots
+* @return whether the resource can be allocated
 */
-   Collection allocateResource(ResourceProfile 
resourceProfile);
+   boolean allocateResource(ResourceProfile resourceProfile);
 
 Review comment:
   I'm not sure whether I have correctly understood your question.
   Telling `allocateResource` how big the worker is in the scope of making slot 
allocation pluggable. In this PR, we have the SM to tell `allocateResource` the 
`WorkerResourceSpec`, in a subsequence commit rather than this one. This PR 
does not make the RM implementations to respect the `WorkerResourceSpec` they 
are told. We make those changes in subsequence PRs.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403805525
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -807,23 +811,32 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
return false;
}
 
-   private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   private Optional 
allocateResource(ResourceProfile requestedSlotResourceProfile) {
+   if (defaultWorkerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
+   return Optional.empty();
+   }
 
-   if (requestedSlots.isEmpty()) {
+   if (!Preconditions.checkNotNull(defaultSlotResourceProfile,
+   "defaultSlotResourceProfile should be null iff 
taskExecutorProcessSpec is null, which means standalone mode.")
+   .isMatching(requestedSlotResourceProfile)) {
+   // requested resource profile is unfulfillable
return Optional.empty();
-   } else {
-   final Iterator slotIterator = 
requestedSlots.iterator();
-   final PendingTaskManagerSlot pendingTaskManagerSlot = 
new PendingTaskManagerSlot(slotIterator.next());
-   
pendingSlots.put(pendingTaskManagerSlot.getTaskManagerSlotId(), 
pendingTaskManagerSlot);
+   }
 
 Review comment:
   I'm not sure whether this should be an invalid state exception. 
   
   It is true that all the slot requests should have unknown resource profile 
ATM. However, I think `SlotManagerImpl` does not necessarily need to know 
whether there're custom profiles or not. As long as the requested resource 
profile can be matched by the default slot profile, it should be able to 
allocate the slot. And if the requested slot profile is larger than the default 
slot, an `UnfulfillableSlotRequestException` should be thrown.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403799016
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1206,8 +1206,12 @@ public Void retrievePayload(ResourceID resourceID) {
//  Resource Management
// 

 
-   protected int getNumberRequiredTaskManagerSlots() {
-   return slotManager.getNumberPendingTaskManagerSlots();
+   protected int getNumberRequiredTaskManagers() {
+   return getPendingWorkerNums().values().stream().reduce(0, 
Integer::sum);
 
 Review comment:
   IIUC, this is because the RM implementations are ignoring the 
`WorkerResourceSpec` passed to `startNewWorker`? Then would it be better to add 
this check in `startNewWorker`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403793315
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -1054,10 +1054,10 @@ protected abstract void internalDeregisterApplication(
 * Allocates a resource using the resource profile.
 *
 * @param resourceProfile The resource description
-* @return Collection of {@link ResourceProfile} describing the 
launched slots
+* @return whether the resource can be allocated
 */
@VisibleForTesting
-   public abstract Collection 
startNewWorker(ResourceProfile resourceProfile);
+   public abstract boolean startNewWorker(ResourceProfile resourceProfile);
 
 Review comment:
   In this commit, we have moved the logics of calculating number of new 
pending slots into `SlotManagerImpl`. Therefore, `SlotManagerImpl` only needs 
to know whether the new worker is started or not, and should be able to 
correctly calculate the number of available slots.
   
   In the final state, RM accepts `WorkerResourceSpec` instead of slot 
`ResourceProfile` for starting new workers. It would become a SM decision that 
how many multiple of `ResourceProfile` the worker should be start with. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403789023
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
 
 Review comment:
   It calls to the abstract method `getDefaultCpus`, which is currently 
overridden differently by different `ActiveResourceManagerFactory` 
implementations.
   
   Alternatively, we could make this a static method in either 
`ActiveResourceManagerFactory` of `TaskExecutorProcessUtils`, and make cpu 
cores an argument of this method. Just not sure how necessary it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403789023
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
 
 Review comment:
   It calls to the abstract method `getDefaultCpus`, which is currently 
overridden differently by different `ActiveResourceManagerFactory` 
implementations.
   
   Alternatively, we could make this a static method in either 
`ActiveResourceManagerFactory` of `TaskExecutorProcessUtils`, and make cpu 
cores an argument of this method. Just not sure how necessary it is.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-04-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r403789023
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
 
 Review comment:
   It calls to the abstract method `getDefaultCpus`, which is currently 
overridden differently by different `ActiveResourceManagerFactory` 
implementations.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-08 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r389455647
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Resource specification of a worker, mainly used by SlotManager requesting 
from ResourceManager.
 
 Review comment:
   I'm not sure why the `XxxConfiguration` is used at the first place. Probably 
there are some good reasons. From what I see, the `XxxConfiguration` clearly 
shows which information is needed for creating the `Xxx`.
   I think we can discuss the pros and cons of `XxxConfiguration` in another 
thread, maybe a code style discussion in ML? For this PR, I would also like to 
exclude it from the scope.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-08 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r389454700
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -804,22 +808,31 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
}
 
private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   if (workerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
 
 Review comment:
   @wangyang0918
   True. That's why we annotate it as `@Nullable` and, as suggested by Tison 
and updated to the PR, clearly documented that a `null` value means in 
standalone mode.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-07 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r389333966
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Resource specification of a worker, mainly used by SlotManager requesting 
from ResourceManager.
 
 Review comment:
   True, we have the `@Nullable WorkerResourceSpec` along the code path for 
creating the `SlotManagerImpl`. To be specific, in the following classes.
   - `ResourceManagerRuntimeServiceConfiguration`
   - `SlotManagerConfiguration`
   - `SlotManagerImpl`
   - `SlotManagerBuilder` (test-only)
   
   I guess the root cause is that we decide `WorkerResourceSpec` differently 
(whether `null` and the `cpu` config options) across different RMs, so we have 
to pass it from the `ResourceManagerFactory`. Unfortunately, ATM I don't see a 
good way to resolve that. Do you have any suggestion on that?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-07 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r389332200
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Resource specification of a worker, mainly used by SlotManager requesting 
from ResourceManager.
 
 Review comment:
   I tend to think it differently.
   I think the semantic of a `WorkerResourceSpec` being `null` belongs to the 
context where it is used, not the class itself. E.g., when we create the 
`SlotManagerImpl`, the `WorkerResourceSpec` means it is on a standalone setup 
and SM should not request any worker from RM. On the other hand, when we create 
a `TaskExecutorProcessSpec` from the `WorkerResourceSpce`, it must not be 
`null`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-07 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r389331707
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Resource specification of a worker, mainly used by SlotManager requesting 
from ResourceManager.
 
 Review comment:
   Good point.
   I think it might be better to document it in `SlotManagerImpl` rather than 
`WorkerResourceSpec`. WDYT?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388878181
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -804,22 +808,31 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
}
 
private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   if (workerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
 
 Review comment:
   @TisonKun
   Please see my reply to Yang for why calling `allocateResource` on standalone 
setup. For the same reason, we cannot merge this into the `if 
(!resourceActions.allocateResource(workerResourceSpec))` branch.
   
   Regarding `slotResourceProfile`, `resourceProfile` and `workerResourceSpec`.
   - `slotResourceProfile` describes the resource capacity of a slot.
   - `resourceProfile` describes the resource requirement of a slot request.
   - `workerResourceSpec` describes the resource requirement of a worker (TM) 
to request.
   
   I think you're right, these things are indeed hard to understand. How do you 
think about renaming `resourceProfile` to `requestedSlotResourceProfile`, and 
`slotResourceProfile` to `defaultSlotResourceProfile`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388878181
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -804,22 +808,31 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
}
 
private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   if (workerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
 
 Review comment:
   @TisonKun
   Please see my reply to Yang for why calling `allocateResource` on standalone 
setup. For the same reason, we cannot merge this into the `if 
(!resourceActions.allocateResource(workerResourceSpec))` branch.
   
   Regarding `slotResourceProfile`, `resourceProfile` and `workerResourceSpec`.
   - `slotResourceProfile` describes the resource capacity of a slot.
   - `resourceProfile` describes the resource requirement of a slot request.
   - `workerResourceSpec` describes the resource requirement of a worker (TM) 
to request.
   I think you're right, these things are indeed hard to understand. How do you 
think about renaming `resourceProfile` to `requestedSlotResourceProfile`, and 
`slotResourceProfile` to `defaultSlotResourceProfile`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388872808
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 ##
 @@ -81,4 +91,27 @@ public static YarnResourceManagerFactory getInstance() {
webInterfaceUrl,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(final Configuration configuration) 
{
+   int fallback = 
configuration.getInteger(YarnConfigOptions.VCORES);
+   double cpuCoresDouble = 
TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, 
fallback).getValue().doubleValue();
+   @SuppressWarnings("NumericCastThatLosesPrecision")
+   long cpuCoresLong = Math.max((long) Math.ceil(cpuCoresDouble), 
1L);
+   //noinspection FloatingPointEquality
+   if (cpuCoresLong != cpuCoresDouble) {
+   LOG.info(
+   "The amount of cpu cores must be a positive 
integer on Yarn. Rounding {} up to the closest positive integer {}.",
+   cpuCoresDouble,
+   cpuCoresLong);
+   }
+   if (cpuCoresLong > Integer.MAX_VALUE) {
 
 Review comment:
   @TisonKun Not sure what do you mean. Are you suggesting simplify this code 
with a `Preconditions.checkstate()`?
   I remember the reason we implemented it like this in `YarnResourceManager` 
at the first place, is to get an `IllegalConfigurationException` instead of a 
`IllegalStateException`, for better readability to the users. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388870649
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -527,10 +524,9 @@ private FinalApplicationStatus 
getYarnStatus(ApplicationStatus status) {
 * Request new container if pending containers cannot satisfy pending 
slot requests.
 */
private void requestYarnContainerIfRequired() {
-   int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   int pendingTaskManagerSlots = numPendingContainerRequests * 
numSlotsPerTaskManager;
+   int requiredTaskManagers = getNumberRequiredTaskManagers();
 
-   if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+   if (requiredTaskManagers > numPendingContainerRequests) {
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388870561
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java
 ##
 @@ -272,10 +267,9 @@ private void requestKubernetesPod() {
 * Request new pod if pending pods cannot satisfy pending slot requests.
 */
private void requestKubernetesPodIfRequired() {
-   final int requiredTaskManagerSlots = 
getNumberRequiredTaskManagerSlots();
-   final int pendingTaskManagerSlots = numPendingPodRequests * 
numSlotsPerTaskManager;
+   final int requiredTaskManagers = 
getNumberRequiredTaskManagers();
 
-   if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
+   if (requiredTaskManagers > numPendingPodRequests) {
 
 Review comment:
   `startNewWorker` is a public interface, and its return value is checked by 
`SlotManager` for creating pending slots.
   For `requestKubernetesPodIfRequired`, I don't see how its return value is 
needed, at least for now. And since it's a internal private method, I think we 
can always add that return value if needed later.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388865070
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -804,22 +808,31 @@ private boolean 
isFulfillableByRegisteredSlots(ResourceProfile resourceProfile)
}
 
private Optional 
allocateResource(ResourceProfile resourceProfile) {
-   final Collection requestedSlots = 
resourceActions.allocateResource(resourceProfile);
+   if (workerResourceSpec == null) {
+   // standalone mode, cannot allocate resource
 
 Review comment:
   The problem is not about calling `StandaloneResourceManager#startNewWorker`. 
It is about not having a `WorkerResourceSpec` on standalone setups.
   
   For a standalone cluster, use may have TM resource configurations only on 
the TM machines. That means trying to create a `WorkerResourceSpec` from the 
configuration on JM might lead to failures due to missing of memory 
configurations. I think it does not make sense to force the users to set TM 
configurations on the JM machine, because they are not really used.
   
   We have run into such problems during the 1.9 release testing, where a 
standalone JM failed because it tries to calculate a default TM managed memory 
size.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388862718
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 ##
 @@ -81,4 +91,27 @@ public static YarnResourceManagerFactory getInstance() {
webInterfaceUrl,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(final Configuration configuration) 
{
+   int fallback = 
configuration.getInteger(YarnConfigOptions.VCORES);
+   double cpuCoresDouble = 
TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, 
fallback).getValue().doubleValue();
+   @SuppressWarnings("NumericCastThatLosesPrecision")
+   long cpuCoresLong = Math.max((long) Math.ceil(cpuCoresDouble), 
1L);
+   //noinspection FloatingPointEquality
+   if (cpuCoresLong != cpuCoresDouble) {
 
 Review comment:
   True, we have not exposed `taskmanager.cpu.cores`. But we might in the 
future, and if we do, it could be easily overlooked to add this check later. 
Again, I believe there's no harm for over checking it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388861967
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 ##
 @@ -81,4 +91,27 @@ public static YarnResourceManagerFactory getInstance() {
webInterfaceUrl,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(final Configuration configuration) 
{
+   int fallback = 
configuration.getInteger(YarnConfigOptions.VCORES);
+   double cpuCoresDouble = 
TaskExecutorProcessUtils.getCpuCoresWithFallback(configuration, 
fallback).getValue().doubleValue();
+   @SuppressWarnings("NumericCastThatLosesPrecision")
+   long cpuCoresLong = Math.max((long) Math.ceil(cpuCoresDouble), 
1L);
+   //noinspection FloatingPointEquality
+   if (cpuCoresLong != cpuCoresDouble) {
+   LOG.info(
+   "The amount of cpu cores must be a positive 
integer on Yarn. Rounding {} up to the closest positive integer {}.",
+   cpuCoresDouble,
+   cpuCoresLong);
+   }
+   if (cpuCoresLong > Integer.MAX_VALUE) {
 
 Review comment:
   I think it is not robust to make the correctness of 
`YarnResourceManagerFactory` depending on another class 
`YarnClusterDescriptor`. There is no harm for double checking it, right?


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-06 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388861404
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpec.java
 ##
 @@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Objects;
+
+/**
+ * Resource specification of a worker, mainly used by SlotManager requesting 
from ResourceManager.
+ */
+public class WorkerResourceSpec {
 
 Review comment:
   There was a discussion about whether reusing `TaskExecutorResourceSpec` in 
the design google doc. We have decided to not reuse it, and I'll quote what 
Till commented on the doc here.
   > TaskExecutorResourceSpec defines for me the resources of a TaskExecutor. 
The SlotManager on the other hand is only interested in a certain set of 
resources which happen to be the exact same set as TaskExecutorResourceSpec 
offers. However, the latter might evolve adding more resource specifications. 
Hence, by reusing TaskExecutorResourceSpec we would also couple the SlotManager 
to the TaskExecutor's resource specification which does not necessarily need to 
be the same.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388723786
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerImpl.java
 ##
 @@ -125,20 +126,25 @@
 * */
private boolean failUnfulfillableRequest = true;
 
+   @Nullable
+   private final WorkerResourceSpec workerResourceSpec;
+
public SlotManagerImpl(
SlotMatchingStrategy slotMatchingStrategy,
ScheduledExecutor scheduledExecutor,
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
-   boolean waitResultConsumedBeforeRelease) {
+   boolean waitResultConsumedBeforeRelease,
+   @Nullable WorkerResourceSpec workerResourceSpec) {
 
this.slotMatchingStrategy = 
Preconditions.checkNotNull(slotMatchingStrategy);
this.scheduledExecutor = 
Preconditions.checkNotNull(scheduledExecutor);
this.taskManagerRequestTimeout = 
Preconditions.checkNotNull(taskManagerRequestTimeout);
this.slotRequestTimeout = 
Preconditions.checkNotNull(slotRequestTimeout);
this.taskManagerTimeout = 
Preconditions.checkNotNull(taskManagerTimeout);
this.waitResultConsumedBeforeRelease = 
waitResultConsumedBeforeRelease;
+   this.workerResourceSpec = workerResourceSpec;
 
 Review comment:
   See my reply above.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388723699
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManagerConfiguration.java
 ##
 @@ -43,19 +46,23 @@
private final Time taskManagerTimeout;
private final boolean waitResultConsumedBeforeRelease;
private final boolean evenlySpreadOutSlots;
+   @Nullable
+   private final WorkerResourceSpec defaultWorkerResourceSpec;
 
public SlotManagerConfiguration(
Time taskManagerRequestTimeout,
Time slotRequestTimeout,
Time taskManagerTimeout,
boolean waitResultConsumedBeforeRelease,
-   boolean evenlySpreadOutSlots) {
+   boolean evenlySpreadOutSlots,
+   @Nullable WorkerResourceSpec defaultWorkerResourceSpec) 
{
 
 Review comment:
   The default WorkerResourceSpec is needed for both static and dynamic slot 
allocation. For dynamic allocation, we need it for scenarios where some jobs do 
not provide fine grained resource requirement.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388723609
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ActiveResourceManagerFactory.java
 ##
 @@ -81,4 +83,14 @@ private static Configuration 
createActiveResourceManagerConfiguration(Configurat
ClusterInformation clusterInformation,
@Nullable String webInterfaceUrl,
ResourceManagerMetricGroup resourceManagerMetricGroup) throws 
Exception;
+
+   protected WorkerResourceSpec 
createDefaultWorkerResourceSpec(Configuration configuration) {
 
 Review comment:
   I tend to disagree, for the following reasons.
   - The default `WorkerResourceSpec` is needed for both static and dynamic 
slot allocation. For dynamic allocation, we need it for scenarios where some 
jobs do not provide fine grained resource requirement.
   - We need the default `WorkerResourceSpec` only for active RMs, but not for 
standalone RM. SM does not know which RM it runs on.
   - We need the default CPU cores for creating the default 
`WorkerResourceSpec`, which is depending on different configuration options per 
active RM. SM does not have that information.
   - According to the dependency injection principle, we should try to avoid 
creating dependencies inside the component if possible.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388721739
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
 ##
 @@ -98,4 +101,10 @@ public MesosResourceManagerFactory(@Nonnull MesosServices 
mesosServices, @Nonnul
webInterfaceUrl,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(Configuration configuration) {
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388721585
 
 

 ##
 File path: 
flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java
 ##
 @@ -80,4 +84,10 @@ public static KubernetesResourceManagerFactory 
getInstance() {
fatalErrorHandler,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(Configuration configuration) {
 
 Review comment:
   Same here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] [flink] xintongsong commented on a change in pull request #11320: [FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager at the worker granularity.

2020-03-05 Thread GitBox
xintongsong commented on a change in pull request #11320: 
[FLINK-16437][runtime] Make SlotManager allocate resource from ResourceManager 
at the worker granularity.
URL: https://github.com/apache/flink/pull/11320#discussion_r388721558
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 ##
 @@ -81,4 +91,27 @@ public static YarnResourceManagerFactory getInstance() {
webInterfaceUrl,
resourceManagerMetricGroup);
}
+
+   @Override
+   protected CPUResource getDefaultCpus(final Configuration configuration) 
{
 
 Review comment:
   Yes, I'm aware of this.
   It's just we are currently having both default `TaskExecutorProcessSpec` and 
default `WorkerResourceSpec`, and will migrate from the former to the latter 
for the active RMs one by one. The `ActiveResourceManager#getCpuCores` could be 
removed soon after all active RMs are migrated. That's why I didn't bother to 
de-duplicate them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services