[
https://issues.apache.org/jira/browse/FLINK-4851?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15588696#comment-15588696
]
ASF GitHub Bot commented on FLINK-4851:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/2655#discussion_r84063789
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
---
@@ -408,106 +528,98 @@ public void run() {
*/
@Override
public void handleError(final Exception exception) {
- log.error("ResourceManager received an error from the
LeaderElectionService.", exception);
- // terminate ResourceManager in case of an error
- shutDown();
+ onFatalErrorAsync(new ResourceManagerException("Received an
error from the LeaderElectionService.", exception));
}
/**
- * Registers an infoMessage listener
+ * This method should be called by the framework once it detects that a
currently registered
+ * task executor has failed.
*
- * @param infoMessageListenerAddress address of infoMessage listener to
register to this resource manager
+ * @param resourceID Id of the worker that has failed.
+ * @param message An informational message that explains why the worker
failed.
*/
- @RpcMethod
- public void registerInfoMessageListener(final String
infoMessageListenerAddress) {
-
if(infoMessageListeners.containsKey(infoMessageListenerAddress)) {
- log.warn("Receive a duplicate registration from info
message listener on ({})", infoMessageListenerAddress);
- } else {
- Future<InfoMessageListenerRpcGateway>
infoMessageListenerRpcGatewayFuture =
getRpcService().connect(infoMessageListenerAddress,
InfoMessageListenerRpcGateway.class);
-
- infoMessageListenerRpcGatewayFuture.thenAcceptAsync(new
AcceptFunction<InfoMessageListenerRpcGateway>() {
- @Override
- public void
accept(InfoMessageListenerRpcGateway gateway) {
- log.info("Receive a registration from
info message listener on ({})", infoMessageListenerAddress);
-
infoMessageListeners.put(infoMessageListenerAddress, gateway);
- }
- }, getMainThreadExecutor());
+ public void notifyWorkerFailed(final ResourceID resourceID, final
String message) {
+ runAsync(new Runnable() {
--- End diff --
I actually just adding logging. I guess that one has to add the
registration id of the TM to filter out outdated notify worker failed calls.
> Add FatalErrorHandler and MetricRegistry to ResourceManager
> -----------------------------------------------------------
>
> Key: FLINK-4851
> URL: https://issues.apache.org/jira/browse/FLINK-4851
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
>
> The {{ResourceManager}} currently does not contain a {{FatalErrorHandler}}.
> In order to harmonize the fatal error handling across all components, we
> should introduce a {{FatalErrorHandler}}, which handles fatal errors.
> Additionally, we should also give a {{MetricRegistry}} to the
> {{ResourceManager}} so that it can report metrics.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)