[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258519473
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/FailureRater.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+/**
+ * A rater to record the failure rate within a time interval.
+ */
+public interface FailureRater {
 
 Review comment:
   Maybe the `FailureRater` could still be useful if we use it to wrap a 
`Meter` and the maximum failure rate. Then the interface would offer
   ```
   public interface FailureRater {
   boolean exceedsFailureRate();
   void markFailure();
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258488042
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -300,6 +305,25 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
}
 
+   /**
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
+*/
+   public void rejectAllPendingSlotRequests(Exception cause) {
+   for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+   rejectPendingSlotRequest(pendingSlotRequest, cause);
 
 Review comment:
   here we should call `cancelSlotRequest`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258488103
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -300,6 +305,25 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
}
 
+   /**
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
+*/
+   public void rejectAllPendingSlotRequests(Exception cause) {
+   for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+   rejectPendingSlotRequest(pendingSlotRequest, cause);
 
 Review comment:
   here we should call `cancelPendingSlotRequest`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258479657
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -630,6 +674,29 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   /**
+* Record failures in ResourceManagers. If maximum failure rate is met, 
then reject all pending reject.
+* @return whether should acquire new container/worker after the failure
+*/
+   @VisibleForTesting
+   protected boolean recordFailure() {
+   failureRater.recordFailure();
+   if (failureRater.exceedMaximumFailureRate()) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedTaskManagerExceedingException(
+   new RuntimeException(String.format("Maximum 
number of failed workers %d in interval %s"
+   + "is detected in Resource 
Manager", failureRater.getMaximumFailureRate(),
+   
failureRater.getFailureInterval().toString();
+
+   return false;
+   }
+
+   return true;
+   }
+
+   protected void rejectAllPendingSlotRequests(Exception e) {
 
 Review comment:
   I would suggest to replace `e` with `cause`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258472458
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ *
+ *
 
 Review comment:
   Please remove line breaks


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258473381
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ *
+ *
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+   private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+   private final int maximumFailureRate;
+   private final Time failureInterval;
+   private final ArrayDeque failureTimestamps;
+
+   public TimestampBasedFailureRater(int maximumFailureRate, Time 
failureInterval) {
+   this.maximumFailureRate = maximumFailureRate;
+   this.failureInterval = failureInterval;
+   this.failureTimestamps = new ArrayDeque<>(maximumFailureRate > 
0 ? maximumFailureRate : DEFAULT_TIMESTAMP_SIZE);
+   }
+
+   @Override
+   public void recordFailure() {
+   failureTimestamps.add(System.currentTimeMillis());
 
 Review comment:
   Let's use `Clock` here for the current time.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258476187
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ##
 @@ -55,6 +55,18 @@
" default, the port of the JobManager, because the same 
ActorSystem is used." +
" Its not possible to use this configuration key to 
define port ranges.");
 
+   /**
+* Defines the maximum number of workers (YARN / Mesos) failure can 
happen in a minute.
+* It is to quickly catch external dependency caused workers failure 
and terminate job
+* accordingly. Be default, -1 is set to disable the feature.
+*/
+   public static final ConfigOption MAXIMUM_WORKERS_FAILURE_RATE 
= ConfigOptions
 
 Review comment:
   A rate should actually be a `double`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258473068
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ *
+ *
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+   private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+   private final int maximumFailureRate;
+   private final Time failureInterval;
+   private final ArrayDeque failureTimestamps;
+
+   public TimestampBasedFailureRater(int maximumFailureRate, Time 
failureInterval) {
 
 Review comment:
   I think `maximumFailureRate` should not be part of this implementation. The 
`TimestampBasedFailureMeter` should simply return a failure rate. Whether it is 
too much or not, should be the responsibility of the user.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258505911
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -395,12 +402,14 @@ public void onContainersAllocated(List 
containers) {

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
-
// release the failed container

workerNodeMap.remove(resourceId);

resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   
requestYarnContainerIfRequired();
+   log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
+   if (recordFailure()) {
+   // and ask for a new one
+   
requestYarnContainerIfRequired();
+   }
 
 Review comment:
   I think the changes are not properly tested. Otherwise 
`onContainersCompleted` should have been caught. Please add the relevant test 
cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258467118
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -149,25 +150,26 @@
private MesosConfiguration initializedMesosConfig;
 
public MesosResourceManager(
-   // base class
-   RpcService rpcService,
-   String resourceManagerEndpointId,
-   ResourceID resourceId,
-   HighAvailabilityServices highAvailabilityServices,
-   HeartbeatServices heartbeatServices,
-   SlotManager slotManager,
-   MetricRegistry metricRegistry,
-   JobLeaderIdService jobLeaderIdService,
-   ClusterInformation clusterInformation,
-   FatalErrorHandler fatalErrorHandler,
-   // Mesos specifics
-   Configuration flinkConfig,
-   MesosServices mesosServices,
-   MesosConfiguration mesosConfig,
-   MesosTaskManagerParameters taskManagerParameters,
-   ContainerSpecification taskManagerContainerSpec,
-   @Nullable String webUiUrl,
-   JobManagerMetricGroup jobManagerMetricGroup) {
+   // base class
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   HighAvailabilityServices highAvailabilityServices,
+   HeartbeatServices heartbeatServices,
+   SlotManager slotManager,
+   MetricRegistry metricRegistry,
+   JobLeaderIdService jobLeaderIdService,
+   ClusterInformation clusterInformation,
+   FatalErrorHandler fatalErrorHandler,
+   // Mesos specifics
+   Configuration flinkConfig,
+   MesosServices mesosServices,
+   MesosConfiguration mesosConfig,
+   MesosTaskManagerParameters taskManagerParameters,
+   ContainerSpecification taskManagerContainerSpec,
+   @Nullable String webUiUrl,
+   JobManagerMetricGroup jobManagerMetricGroup,
+   FailureRater failureRater) {
 
 Review comment:
   Let's move this parameter into the non-mesos specific section by adding it 
after `fatalErrorHandler`, for example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258481348
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/entrypoint/YarnResourceManagerFactory.java
 ##
 @@ -74,6 +81,8 @@
clusterInformation,
fatalErrorHandler,
webInterfaceUrl,
-   jobManagerMetricGroup);
+   jobManagerMetricGroup,
+   new TimestampBasedFailureRater(failureRate, Time.of(1, 
TimeUnit.MINUTES))
 
 Review comment:
   Let's encapsulate the `FailureRater` instantiation based on a 
`configuration` somewhere. This logic is repeated in several places.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258478341
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotPoolImpl.java
 ##
 @@ -324,6 +325,10 @@ private void requestSlotFromResourceManager(
// been completed with another 
allocated slot

resourceManagerGateway.cancelSlotRequest(allocationId);
}
+
+   if (throwable instanceof 
MaximumFailedTaskManagerExceedingException) {
+   throw 
(MaximumFailedTaskManagerExceedingException) throwable;
+   }
 
 Review comment:
   This code should not do anything. Why did you add it?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258479080
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -192,7 +228,8 @@ public ResourceManager(
this.jobManagerRegistrations = new HashMap<>(4);
this.jmResourceIdRegistrations = new HashMap<>(4);
this.taskExecutors = new HashMap<>(8);
-   infoMessageListeners = new ConcurrentHashMap<>(8);
+   this.infoMessageListeners = new ConcurrentHashMap<>(8);
+   this.failureRater = failureRater;
 
 Review comment:
   `checkNotNull` missing. Please put below `jobManagerMetricGroup`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258489209
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -630,6 +674,29 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   /**
+* Record failures in ResourceManagers. If maximum failure rate is met, 
then reject all pending reject.
+* @return whether should acquire new container/worker after the failure
+*/
+   @VisibleForTesting
+   protected boolean recordFailure() {
 
 Review comment:
   Let's call it `recordWorkerFailure` and update the JavaDoc to be bit more 
specific about which *failures* in *ResourceManagers* it records.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258503759
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -425,6 +462,13 @@ public void disconnectJobManager(final JobID jobId, final 
Exception cause) {
slotRequest.getJobId(),
slotRequest.getAllocationId());
 
+   if (failureRater.exceedMaximumFailureRate()) {
+   return 
FutureUtils.completedExceptionally(new 
MaximumFailedTaskManagerExceedingException(
+   new 
RuntimeException(String.format("Maximum number of failed container %d in 
interval %s "
+   + " is detected in 
Resource Manager.", failureRater.getCurrentFailureRate(),
+   
failureRater.getFailureInterval().toString();
+   }
 
 Review comment:
   I think this is the wrong place to check this. We should check the 
`failureRater.exceedMaximumFailureRater()` in a method from which we call 
`startNewWorker`. E.g. `tryStartNewWorker`.
   
   ```
   protected Collection tryStartNewWorker(ResourceProfile 
resourceProfile) {
if (failureRater.exceedMaximumFailureRate()) {
return Collections.emptyList();
}
   
return startNewWorker(resourceProfile);
   }
   
   protected void startNewWorkerIfNeeded(ResourceProfile resourceProfile, int 
pendingSlots) {
int currentPendingSlots = pendingSlots;
while (currentPendingSlots < getNumberRequiredTaskManagerSlots()) {
final Collection slots = 
tryStartNewWorker(resourceProfile);
   
if (slots.isEmpty()) {
break;
}
   
currentPendingSlots += slots.size();
}
   }
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258487948
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -300,6 +305,25 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
}
 
+   /**
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
+*/
+   public void rejectAllPendingSlotRequests(Exception cause) {
 
 Review comment:
   I think this should rather be called `cancelAllPendingSlotRequests`. 
Moreover, we could refactor the `suspend` method to call this method instead of 
manually cancelling all pending slot requests via `cancelPendingSlotRequest`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258505303
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -395,12 +402,14 @@ public void onContainersAllocated(List 
containers) {

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
-
// release the failed container

workerNodeMap.remove(resourceId);

resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   
requestYarnContainerIfRequired();
+   log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
+   if (recordFailure()) {
+   // and ask for a new one
+   
requestYarnContainerIfRequired();
+   }
 
 Review comment:
   I think you are missing the `onContainersCompleted` case. If a container 
fails after the `startContainer` request has been sent, we won't record the 
failure and always allocate a new container. That's the reason why I would like 
to move this logic into the `ResourceManager` so that all container start call 
will go through the same code path.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258468285
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -303,7 +313,8 @@ protected void closeTaskManagerConnection(ResourceID 
resourceID, Exception cause
rmServices.mesosConfig,
tmParams,
containerSpecification,
-   
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup());
+   
UnregisteredMetricGroups.createUnregisteredJobManagerMetricGroup(),
+   new TimestampBasedFailureRater(2, 
Time.of(1, TimeUnit.MINUTES)));
 
 Review comment:
   the `NoFailureRater` implementation would be to pass in here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258477492
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ##
 @@ -55,6 +55,18 @@
" default, the port of the JobManager, because the same 
ActorSystem is used." +
" Its not possible to use this configuration key to 
define port ranges.");
 
+   /**
+* Defines the maximum number of workers (YARN / Mesos) failure can 
happen in a minute.
 
 Review comment:
   Maybe it would be best to define the failure rate per minute or per second 
as you've done it but also be able to make interval of the 
`TimestampBasedFailureRater` configurable. Per default the interval could be 1 
or 5 minutes for example.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258478928
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -153,6 +160,34 @@
 */
private CompletableFuture clearStateFuture = 
CompletableFuture.completedFuture(null);
 
+   public ResourceManager(
+   RpcService rpcService,
+   String resourceManagerEndpointId,
+   ResourceID resourceId,
+   HighAvailabilityServices highAvailabilityServices,
+   HeartbeatServices heartbeatServices,
+   SlotManager slotManager,
+   MetricRegistry metricRegistry,
+   JobLeaderIdService jobLeaderIdService,
+   ClusterInformation clusterInformation,
+   FatalErrorHandler fatalErrorHandler,
+   JobManagerMetricGroup jobManagerMetricGroup) {
+   this(
+   rpcService,
+   resourceManagerEndpointId,
+   resourceId,
+   highAvailabilityServices,
+   heartbeatServices,
+   slotManager,
+   metricRegistry,
+   jobLeaderIdService,
+   clusterInformation,
+   fatalErrorHandler,
+   jobManagerMetricGroup,
+   new TimestampBasedFailureRater(-1, Time.of(1, 
TimeUnit.MINUTES))
+   );
+   }
 
 Review comment:
   Please remove this constructor. Instead please pass in the proper 
`FailureRater` when creating the `ResourceManager`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258480551
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -94,6 +95,9 @@
 * Container ID generation may vary across Hadoop versions. */
private static final String ENV_FLINK_CONTAINER_ID = 
"_FLINK_CONTAINER_ID";
 
+   /** The default initial number of task manager. **/
+   private static final String DEFAULT_INITIAL_NUM_TASK_MANAGER = "2";
 
 Review comment:
   Can this be removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258481057
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnConfigOptions.java
 ##
 @@ -83,8 +83,8 @@
 */
public static final ConfigOption MAX_FAILED_CONTAINERS =
key("yarn.maximum-failed-containers")
-   .noDefaultValue()
-   .withDescription("Maximum number of containers the system is 
going to reallocate in case of a failure.");
+   .noDefaultValue()
+   .withDescription("Maximum number of containers the 
system is going to reallocate in case of a failure.");
 
 Review comment:
   I would suggest to revert this change and rather put `key` on the same line 
as `MAX_FAILED_CONTAINERS`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258508761
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -708,6 +719,47 @@ public void testWorkerFailed() throws Exception {
}};
}
 
+   @Test
+   public void testWorkerFailedAtFailureRate() throws Exception {
+   new Context() {{
+   // set the initial persistent state with a launched 
worker
+   MesosWorkerStore.Worker worker1launched = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+   MesosWorkerStore.Worker worker2launched = 
MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+
+   
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+   
when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1launched,
 worker2launched));
+   
when(rmServices.workerStore.newTaskID()).thenReturn(task3);
+   startResourceManager();
+
+   // tell the RM that a tasks failed
+   
when(rmServices.workerStore.removeWorker(task1)).thenReturn(true);
+   
when(rmServices.workerStore.removeWorker(task2)).thenReturn(true);
+   resourceManager.taskTerminated(new 
TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
+   
.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+   // tell the RM that a task failed
+   resourceManager.taskTerminated(new 
TaskMonitor.TaskTerminated(task2, Protos.TaskStatus.newBuilder()
+   
.setTaskId(task2).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+   verify(rmServices.workerStore).removeWorker(task1);
+   verify(rmServices.workerStore).removeWorker(task2);
+   assertThat(resourceManager.workersInLaunch.entrySet(), 
empty());
+   
assertThat(resourceManager.workersBeingReturned.entrySet(), empty());
+   assertThat(resourceManager.workersInNew, 
hasKey(extractResourceID(task3)));
+
+   // request second slot
+   CompletableFuture registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, slave1host));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   assertEquals(0, 
rmServices.slotManager.getNumberPendingSlotRequest());
+   }};
 
 Review comment:
   I think we can improve this test by only testing that 
`resourceManager.taskTerminated` increases the `failureRater` and then having a 
dedicated test for the `ResourceManager` which makes sure that you won't start 
new workers if `failureRater` is exceeded. Then we would not have to mock all 
this behaviour which is super brittle.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258503122
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -663,7 +666,10 @@ public void taskTerminated(TaskMonitor.TaskTerminated 
message) {
assert(launched != null);
LOG.info("Worker {} failed with status: {}, reason: {}, 
message: {}.",
id, status.getState(), status.getReason(), 
status.getMessage());
-   startNewWorker(launched.profile());
+
+   if (recordFailure()) {
+   startNewWorker(launched.profile());
+   }
 
 Review comment:
   I think we should move the logic whether we can start a new worker or not to 
the `ResourceManager`. E.g. we could have the following methods in 
`ResourceManager`
   ```
   protected Collection tryStartNewWorker(ResourceProfile 
resourceProfile) {
if (failureRater.exceedMaximumFailureRate()) {
return Collections.emptyList();
}
   
return startNewWorker(resourceProfile);
   }
   
   protected void startNewWorkerIfNeeded(ResourceProfile resourceProfile, int 
pendingSlots) {
int currentPendingSlots = pendingSlots;
while (currentPendingSlots < getNumberRequiredTaskManagerSlots()) {
final Collection slots = 
tryStartNewWorker(resourceProfile);
   
if (slots.isEmpty()) {
break;
}
   
currentPendingSlots += slots.size();
}
   }
   ```
   
   and then we only call `startNewWorkerIfNeeded` from here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258481693
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -300,6 +304,18 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
}
 
+   /**
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
+*/
+   public void rejectAllPendingSlotRequests(Exception cause) {
+   for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+   rejectPendingSlotRequest(pendingSlotRequest, cause);
+   }
+
+   pendingSlotRequests.clear();
+   }
 
 Review comment:
   Did you add a test for this function?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258503994
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -630,6 +674,29 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   /**
+* Record failures in ResourceManagers. If maximum failure rate is met, 
then reject all pending reject.
+* @return whether should acquire new container/worker after the failure
+*/
+   @VisibleForTesting
+   protected boolean recordFailure() {
 
 Review comment:
   I think the boolean return type is not needed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258480345
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRaterTest.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Test time stamp based failure rater.
+ */
+public class TimestampBasedFailureRaterTest {
+
+   @Test
+   public void testMaximumFailureCheck() {
+   FailureRater rater = new TimestampBasedFailureRater(5, 
Time.of(10, TimeUnit.SECONDS));
+
+   for (int i = 0; i < 6; i++) {
+   rater.recordFailure();
+   }
+
+   Assert.assertEquals(6, rater.getCurrentFailureRate());
+   Assert.assertTrue(rater.exceedMaximumFailureRate());
+   }
+
+   @Test
+   public void testMovingRate() throws InterruptedException {
+   FailureRater rater = new TimestampBasedFailureRater(5, 
Time.of(500, TimeUnit.MILLISECONDS));
+
+   for (int i = 0; i < 6; i++) {
+   rater.recordFailure();
+   Thread.sleep(150);
 
 Review comment:
   Instead of sleeping let's pass in the `ManualClock` with which we can 
control the time. This will give us a better test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258507526
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -518,4 +528,61 @@ public void testOnContainerCompleted() throws Exception {
});
}};
}
+
+   /**
+*  Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+*  contains is hit.
+*/
+   @Test
+   public void testOnContainersAllocatedWithFailure() throws Exception {
+   new Context() {{
+   runTest(() -> {
+   CompletableFuture registerSlotRequestFuture 
= resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+
+   // Callback from YARN when container is 
allocated.
+   Container disconnectedContainer1 = 
mockContainer("container1", 1234, 1, resourceManager.getContainerResource());
+
+   
doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest(
+   
.when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), 
anyString(), any(Resource.class));
+
+   
resourceManager.onContainersAllocated(ImmutableList.of(disconnectedContainer1));
+   
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
+   
verify(mockNMClient).startContainer(eq(disconnectedContainer1), 
any(ContainerLaunchContext.class));
+
+   ResourceID connectedTM = new 
ResourceID(disconnectedContainer1.getId().toString());
+
+   
resourceManager.registerTaskExecutor("container1", connectedTM, 1234,
+   hardwareDescription, Time.seconds(10L));
+
+   // force to unregister the task manager
+   
resourceManager.disconnectTaskManager(connectedTM, new TimeoutException());
+
+   // request second slot
+   registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   
rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), 
new AllocationID(), resourceProfile1, taskHost));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   Container failedContainer = 
mockContainer("container2", 2345, 2, resourceManager.getContainerResource());
+   
when(mockNMClient.startContainer(eq(failedContainer), any())).thenThrow(new 
YarnException("Failed"));
+
+   CompletableFuture 
rejectAllPendingRequestFuture = resourceManager.runInMainThread(() -> {
+   
resourceManager.onContainersAllocated(ImmutableList.of(failedContainer));
+   return  null;
+   });
+   rejectAllPendingRequestFuture.get();
+   
assertEquals(rmServices.slotManager.getNumberPendingSlotRequest(), 0);
+   });
+   }};
 
 Review comment:
   This test is super hard to maintain with all this mocking. I would suggest 
to add a test for the `ResourceManager` which makes sure that you cannot start 
a new worker if the `FailureRater` exceeds the maximum failure rate. 
Additionally, we should add test cases for the `YarnResourceManager` which 
makes sure that `onContainersCompleted` and failures in the 
`onContainerAllocated` method will increase the `failureRater`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258479558
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -630,6 +674,29 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   /**
+* Record failures in ResourceManagers. If maximum failure rate is met, 
then reject all pending reject.
+* @return whether should acquire new container/worker after the failure
+*/
+   @VisibleForTesting
+   protected boolean recordFailure() {
+   failureRater.recordFailure();
+   if (failureRater.exceedMaximumFailureRate()) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedTaskManagerExceedingException(
+   new RuntimeException(String.format("Maximum 
number of failed workers %d in interval %s"
+   + "is detected in Resource 
Manager", failureRater.getMaximumFailureRate(),
+   
failureRater.getFailureInterval().toString();
+
+   return false;
+   }
+
+   return true;
+   }
+
+   protected void rejectAllPendingSlotRequests(Exception e) {
 
 Review comment:
   Is this method used by sub-classes? If not, then let's make it private.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258488179
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -300,6 +305,25 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
}
 
+   /**
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
+*/
+   public void rejectAllPendingSlotRequests(Exception cause) {
+   for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+   rejectPendingSlotRequest(pendingSlotRequest, cause);
+
+   // notify each job master about this exception
+   resourceActions.notifyAllocationFailure(
 
 Review comment:
   This should go into `cancelPendingSlotRequest`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258466821
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ##
 @@ -55,6 +55,18 @@
" default, the port of the JobManager, because the same 
ActorSystem is used." +
" Its not possible to use this configuration key to 
define port ranges.");
 
+   /**
+* Defines the maximum number of workers (YARN / Mesos) failure can 
happen in a minute.
+* It is to quickly catch external dependency caused workers failure 
and terminate job
+* accordingly. Be default, -1 is set to disable the feature.
+*/
+   public static final ConfigOption MAXIMUM_WORKERS_FAILURE_RATE 
= ConfigOptions
+   .key("resourcemanager.maximum-workers-failure-rate")
+   .defaultValue(-1)
+   .withDescription("Defines the maximum number of workers (YARN / 
Mesos) failure can happen in a minute." +
+   "It is to quickly catch external dependency caused 
workers failure and terminate job" +
+   "accordingly. Be default, -1 is set to disable the 
feature.");
 
 Review comment:
   Missing whitespaces, see the generated output to see where to insert them.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258466657
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ##
 @@ -55,6 +55,18 @@
" default, the port of the JobManager, because the same 
ActorSystem is used." +
" Its not possible to use this configuration key to 
define port ranges.");
 
+   /**
+* Defines the maximum number of workers (YARN / Mesos) failure can 
happen in a minute.
 
 Review comment:
   Do we want to define the rate per minute? If we define it per hour, it would 
be possible to define lower rates. The downside would be that it takes longer 
until you fall below the maximum threshold again/recover. Maybe it would be 
good to make this controllable (contradicting my initial comments because you 
had it like this). What do you think?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258465206
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/configuration/ResourceManagerOptions.java
 ##
 @@ -55,6 +55,18 @@
" default, the port of the JobManager, because the same 
ActorSystem is used." +
" Its not possible to use this configuration key to 
define port ranges.");
 
+   /**
+* Defines the maximum number of workers (YARN / Mesos) failure can 
happen in a minute.
 
 Review comment:
   Maybe: "Defines the maximum number of worker (YARN / Mesos) failures per 
minute before rejecting subsequent worker requests until the failure rate falls 
below the maximum." 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258472325
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/FailureRater.java
 ##
 @@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+/**
+ * A rater to record the failure rate within a time interval.
+ */
+public interface FailureRater {
 
 Review comment:
   Instead of introducing a new interface, couldn't we use the `Meter` 
interface instead? What we are interested in is a rate and a `Meter` gives us 
exactly this.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258477755
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ *
+ *
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+   private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+   private final int maximumFailureRate;
+   private final Time failureInterval;
+   private final ArrayDeque failureTimestamps;
+
+   public TimestampBasedFailureRater(int maximumFailureRate, Time 
failureInterval) {
+   this.maximumFailureRate = maximumFailureRate;
+   this.failureInterval = failureInterval;
+   this.failureTimestamps = new ArrayDeque<>(maximumFailureRate > 
0 ? maximumFailureRate : DEFAULT_TIMESTAMP_SIZE);
+   }
+
+   @Override
+   public void recordFailure() {
+   failureTimestamps.add(System.currentTimeMillis());
+   }
+
+   @Override
+   public int getMaximumFailureRate() {
+   return maximumFailureRate;
+   }
+
+   @Override
+   public Time getFailureInterval() {
+   return failureInterval;
+   }
+
+   @Override
+   public long getCurrentFailureRate() {
+   Long currentTimeStamp = System.currentTimeMillis();
+   while (!failureTimestamps.isEmpty() &&
+   currentTimeStamp - failureTimestamps.peek() > 
failureInterval.toMilliseconds()) {
+   failureTimestamps.remove();
+   }
+
+   return failureTimestamps.size();
+   }
+
+   @Override
+   public boolean exceedMaximumFailureRate() {
 
 Review comment:
   This logic should not be part of the `FailureRater`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258480075
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRaterTest.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.concurrent.TimeUnit;
+
+
+/**
+ * Test time stamp based failure rater.
+ */
+public class TimestampBasedFailureRaterTest {
 
 Review comment:
   Tests should extend `TestLogger`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258472565
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ *
+ *
+ */
+public class TimestampBasedFailureRater implements FailureRater {
 
 Review comment:
   Let's extend `Meter` here


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258473240
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/failurerate/TimestampBasedFailureRater.java
 ##
 @@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.failurerate;
+
+import org.apache.flink.api.common.time.Time;
+
+import java.util.ArrayDeque;
+
+/**
+ * A timestamp queue based failure rater implementation.
+ *
+ *
+ */
+public class TimestampBasedFailureRater implements FailureRater {
+   private static final int DEFAULT_TIMESTAMP_SIZE = 300;
+   private final int maximumFailureRate;
+   private final Time failureInterval;
+   private final ArrayDeque failureTimestamps;
+
+   public TimestampBasedFailureRater(int maximumFailureRate, Time 
failureInterval) {
 
 Review comment:
   Let's pass in a `Clock` instance to make this class easier to test.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258479831
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -178,6 +178,10 @@ public int getNumberPendingTaskManagerSlots() {
return pendingSlots.size();
}
 
+   public int getNumberPendingSlotRequest() {
 
 Review comment:
   Does it need to be `public`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-02-20 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r258468197
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerFactory.java
 ##
 @@ -97,6 +104,8 @@ public MesosResourceManagerFactory(@Nonnull MesosServices 
mesosServices, @Nonnul
taskManagerParameters,
taskManagerContainerSpec,
webInterfaceUrl,
-   jobManagerMetricGroup);
+   jobManagerMetricGroup,
+   new TimestampBasedFailureRater(failureRate, Time.of(1, 
TimeUnit.MINUTES))
 
 Review comment:
   I think it would be good to have `NoFailureRater` implementation which is 
instantiated if `failureRate == -1`. We could put the instantiation logic into 
a helper method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252360494
 
 

 ##
 File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
 ##
 @@ -395,12 +405,20 @@ public void onContainersAllocated(List 
containers) {

nodeManagerClient.startContainer(container, taskExecutorLaunchContext);
} catch (Throwable t) {
log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
-
// release the failed container

workerNodeMap.remove(resourceId);

resourceManagerClient.releaseAssignedContainer(container.getId());
-   // and ask for a new one
-   
requestYarnContainerIfRequired();
+   log.error("Could not start 
TaskManager in container {}.", container.getId(), t);
+   recordFailure();
+   if (shouldRejectRequests()) {
+   
rejectAllPendingSlotRequests(new MaximumFailedTaskManagerExceedingException(
+   
String.format("Maximum number of failed container %d in interval %s"
+   
+ "is detected in Resource Manager", maximumFailureTaskExecutorPerInternal,
+   
failureInterval.toString()), t));
 
 Review comment:
   This branch should go into the `recordFailure` method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252354300
 
 

 ##
 File path: docs/_includes/generated/mesos_configuration.html
 ##
 @@ -27,6 +27,11 @@
 -1
 The maximum number of failed workers before the cluster fails. 
May be set to -1 to disable this feature. This option is ignored unless Flink 
is in legacy mode.
 
+
+mesos.maximum-failed-workers-per-interval
+-1
+Maximum number of workers the system is going to reallocate in 
case of a failure in an interval.
 
 Review comment:
   Is this a Mesos specific configuration? If we want to support that for all 
`ResourceManagers`, then it might be better to use a more generic configuration 
name. E.g. `resourcemanager.maximum-workers-failure-rate`.
   
   Should we combine `maximum-failed-workers-per-interval` and 
`workers-failure-rate-interval` into a `failure-rate` config option which 
defines how many workers can fail per minute or per hour?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252354509
 
 

 ##
 File path: docs/_includes/generated/yarn_config_configuration.html
 ##
 @@ -42,6 +47,11 @@
 (none)
 Maximum number of containers the system is going to reallocate 
in case of a failure.
 
+
+yarn.maximum-failed-containers-per-interval
+-1
+Maximum number of containers the system is going to reallocate 
in case of a failure in an interval.
+
 
 Review comment:
   Duplicate config option.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252364001
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -145,6 +148,15 @@
/** All registered listeners for status updates of the ResourceManager. 
*/
private ConcurrentMap 
infoMessageListeners;
 
+   protected final Time failureInterval;
+
+   protected final int maximumFailureTaskExecutorPerInternal;
+
+   private boolean checkFailureRate;
+
+   private final ArrayDeque taskExecutorFailureTimestamps;
 
 Review comment:
   I think we should encapsulate the metering logic behind some interface. 
Ideally I would like to use exponentially-weighted moving average to 
approximate the failure rate. This would have the benefit that we would not 
have to store every timestamp here. However, with an interface we could also 
have different implementations (e.g. the exact one you have implemented here).


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252360333
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -300,6 +304,18 @@ public boolean registerSlotRequest(SlotRequest 
slotRequest) throws SlotManagerEx
}
}
 
+   /**
+* Rejects all pending slot requests.
+* @param cause the exception caused the rejection
+*/
+   public void rejectAllPendingSlotRequests(Exception cause) {
+   for (PendingSlotRequest pendingSlotRequest : 
pendingSlotRequests.values()) {
+   rejectPendingSlotRequest(pendingSlotRequest, cause);
+   }
+
+   pendingSlotRequests.clear();
+   }
 
 Review comment:
   We should add a test for this method.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252360809
 
 

 ##
 File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java
 ##
 @@ -521,4 +533,56 @@ public void testOnContainerCompleted() throws Exception {
});
}};
}
+
+   /**
+*  Tests that YarnResourceManager will trigger to reject all 
pending slot request, when maximum number of failed
+*  contains is hit.
+*/
+   @Test
+   public void testOnContainersAllocatedWithFailure() throws Exception {
 
 Review comment:
   With the generalized test for the failure rate behaviour in 
`ResourceManagerTest` we would only need to check that a failing container 
would call `ResourceManager#recordFailure`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252358049
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -145,6 +148,15 @@
/** All registered listeners for status updates of the ResourceManager. 
*/
private ConcurrentMap 
infoMessageListeners;
 
+   protected final Time failureInterval;
+
+   protected final int maximumFailureTaskExecutorPerInternal;
 
 Review comment:
   typo


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252358410
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -626,6 +686,44 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   protected void rejectAllPendingSlotRequests(Exception e) {
+   slotManager.rejectAllPendingSlotRequests(e);
+   }
+
+   protected synchronized void recordFailure() {
 
 Review comment:
   Does it need to be synchronized?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252357459
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -708,6 +720,50 @@ public void testWorkerFailed() throws Exception {
}};
}
 
+   /**
+* Test worker failure hit maximum worker failure rate.
+*/
+   @Test
+   public void testWorkerFailedAtFailureRate() throws Exception {
+   new Context() {{
+   // set the initial persistent state with a launched 
worker
+   MesosWorkerStore.Worker worker1launched = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+   MesosWorkerStore.Worker worker2launched = 
MesosWorkerStore.Worker.newWorker(task2).launchWorker(slave1, slave1host);
+
+   
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+   
when(rmServices.workerStore.recoverWorkers()).thenReturn(Arrays.asList(worker1launched,
 worker2launched));
+   
when(rmServices.workerStore.newTaskID()).thenReturn(task3);
+   startResourceManager();
+
+   // tell the RM that a tasks failed
+   
when(rmServices.workerStore.removeWorker(task1)).thenReturn(true);
+   
when(rmServices.workerStore.removeWorker(task2)).thenReturn(true);
+   resourceManager.taskTerminated(new 
TaskMonitor.TaskTerminated(task1, Protos.TaskStatus.newBuilder()
+   
.setTaskId(task1).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+   // tell the RM that a task failed
+   resourceManager.taskTerminated(new 
TaskMonitor.TaskTerminated(task2, Protos.TaskStatus.newBuilder()
+   
.setTaskId(task2).setSlaveId(slave1).setState(Protos.TaskState.TASK_FAILED).build()));
+
+   verify(rmServices.workerStore).removeWorker(task1);
+   verify(rmServices.workerStore).removeWorker(task2);
+   assertThat(resourceManager.workersInLaunch.entrySet(), 
empty());
+   
assertThat(resourceManager.workersBeingReturned.entrySet(), empty());
+   assertThat(resourceManager.workersInNew, 
hasKey(extractResourceID(task3)));
+
+   // request second slot
+   CompletableFuture registerSlotRequestFuture = 
resourceManager.runInMainThread(() -> {
+   rmServices.slotManager.registerSlotRequest(
+   new SlotRequest(new JobID(), new 
AllocationID(), resourceProfile1, slave1host));
+   return null;
+   });
+
+   // wait for the registerSlotRequest completion
+   registerSlotRequestFuture.get();
+   assertEquals(0, 
rmServices.slotManager.getNumberPendingSlotRequest());
+   }};
 
 Review comment:
   I think it would be good to move this test into `ResourceManagerTest` 
because the functionality is not Mesos specific.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252355825
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -167,7 +169,9 @@ public MesosResourceManager(
MesosTaskManagerParameters taskManagerParameters,
ContainerSpecification taskManagerContainerSpec,
@Nullable String webUiUrl,
-   JobManagerMetricGroup jobManagerMetricGroup) {
+   JobManagerMetricGroup jobManagerMetricGroup,
+   Time failureInterval,
+   int maxFailurePerInterval) {
 
 Review comment:
   either let's have `int failuresPerHour` or we should create a `FailureRate` 
class which encapsulates the `failureInterval` and `maxFailurePerInterval` 
values.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252354894
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/configuration/MesosOptions.java
 ##
 @@ -99,6 +99,25 @@
.withDescription("The config parameter defining the 
Mesos artifact server port to use. Setting the port to" +
" 0 will let the OS choose an available port.");
 
+   /**
+* The maximum number of failed Mesos worker within an interval before 
entirely stopping
+* the Mesos session / job on Mesos.
+* By default, the value is -1
+*/
+   public static final ConfigOption 
MAX_FAILED_WORKERS_PER_INTERVAL =
+   key("mesos.maximum-failed-workers-per-interval")
+   .defaultValue(-1)
+   .withDescription("Maximum number of workers the system 
is going to reallocate in case of a failure in an interval.");
+
+   /**
+* The interval for measuring failure rate of containers in second unit.
+* By default, the value is 5 minutes.
+**/
+   public static final ConfigOption WORKERS_FAILURE_RATE_INTERVAL 
=
+   key("mesos.workers-failure-rate-interval")
+   .defaultValue(300)
+   .withDeprecatedKeys("The interval for measuring failure 
rate of workers");
+
 
 Review comment:
   I would suggest to use a single `ConfigOption WORKERS_FAILURE_RATE` because 
it would make the configuration easier.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252354461
 
 

 ##
 File path: docs/_includes/generated/yarn_config_configuration.html
 ##
 @@ -27,6 +27,11 @@
 -1
 The port where the application master RPC system is 
listening.
 
+
+yarn.containers-failure-rate-interval
+300
+The interval for measuring failure rate of containers
+
 
 Review comment:
   Duplicate config option.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252360241
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 ##
 @@ -178,6 +178,10 @@ public int getNumberPendingTaskManagerSlots() {
return pendingSlots.size();
}
 
+   public int getNumberPendingSlotRequest() {
 
 Review comment:
   Does this need to be public? We should add `@VisibleForTesting`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252360006
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -421,6 +474,13 @@ public void disconnectJobManager(final JobID jobId, final 
Exception cause) {
slotRequest.getJobId(),
slotRequest.getAllocationId());
 
+   if (shouldRejectRequests()) {
 
 Review comment:
   I think this check should rather be made in the 
`ResourceActionsImpl#allocateResource` because the slot request might still be 
fulfilled with an existing slot.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252355315
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -663,7 +669,17 @@ public void taskTerminated(TaskMonitor.TaskTerminated 
message) {
assert(launched != null);
LOG.info("Worker {} failed with status: {}, reason: {}, 
message: {}.",
id, status.getState(), status.getReason(), 
status.getMessage());
-   startNewWorker(launched.profile());
+
+   recordFailure();
+
+   if (shouldRejectRequests()) {
+   rejectAllPendingSlotRequests(new 
MaximumFailedTaskManagerExceedingException(
+   new 
RuntimeException(String.format("Maximum number of failed workers %d in interval 
%s"
+   + "is detected in 
Resource Manager", maximumFailureTaskExecutorPerInternal,
+   failureInterval.toString();
+   } else {
+   startNewWorker(launched.profile());
+   }
 
 Review comment:
   This block seems to be generic enough that it should live in the base class.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #7356: [FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers

2019-01-30 Thread GitBox
tillrohrmann commented on a change in pull request #7356: 
[FLINK-10868][flink-yarn] Enforce maximum TMs failure rate in ResourceManagers
URL: https://github.com/apache/flink/pull/7356#discussion_r252358339
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -626,6 +686,44 @@ public void unRegisterInfoMessageListener(final String 
address) {
}
}
 
+   protected void rejectAllPendingSlotRequests(Exception e) {
+   slotManager.rejectAllPendingSlotRequests(e);
+   }
+
+   protected synchronized void recordFailure() {
 
 Review comment:
   Let's call it `recordWorkerFailure()` and add some JavaDoc when to call it.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services