This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new d67f7db [FLINK-11845][runtime] Drop legacy InfoMessage d67f7db is described below commit d67f7dbaa096f7b2915675a1012fe31003441e7c Author: ZiLi Chen <wander4...@gmail.com> AuthorDate: Thu Mar 14 18:02:59 2019 +0800 [FLINK-11845][runtime] Drop legacy InfoMessage --- .../clusterframework/messages/InfoMessage.java | 58 ------------------ .../messages/RegisterInfoMessageListener.java | 70 ---------------------- .../messages/UnRegisterInfoMessageListener.java | 70 ---------------------- .../InfoMessageListenerRpcGateway.java | 34 ----------- .../runtime/resourcemanager/ResourceManager.java | 61 ------------------- .../resourcemanager/ResourceManagerGateway.java | 15 ----- .../utils/TestingResourceManagerGateway.java | 10 ---- 7 files changed, 318 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/InfoMessage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/InfoMessage.java deleted file mode 100644 index 81670d7..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/InfoMessage.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework.messages; - -import java.util.Date; - -import static java.util.Objects.requireNonNull; - -/** - * A simple informational message sent by the resource master to the client. - */ -public class InfoMessage implements java.io.Serializable { - - private static final long serialVersionUID = 5534993035539629765L; - - private final String message; - - private final Date date; - - public InfoMessage(String message) { - this.message = message; - this.date = new Date(); - } - - public InfoMessage(String message, Date date) { - this.message = requireNonNull(message); - this.date = requireNonNull(date); - } - - public String message() { - return message; - } - - public Date date() { - return date; - } - - @Override - public String toString() { - return "InfoMessage { message='" + message + "', date=" + date + " }"; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListener.java deleted file mode 100644 index d39ff9b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListener.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework.messages; - -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - -/** - * This message signals to the resource master to register the sender as an - * info message listener. - */ -public class RegisterInfoMessageListener implements RequiresLeaderSessionID, java.io.Serializable { - - private static final long serialVersionUID = 7808628311617273755L; - - /** The singleton instance */ - private static final RegisterInfoMessageListener INSTANCE = new RegisterInfoMessageListener(); - - /** - * Gets the singleton instance. - * @return The singleton instance. - */ - public static RegisterInfoMessageListener getInstance() { - return INSTANCE; - } - - // ------------------------------------------------------------------------ - - /** Private constructor to prevent instantiation */ - private RegisterInfoMessageListener() {} - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - return obj != null && obj.getClass() == RegisterInfoMessageListener.class; - } - - @Override - public int hashCode() { - return 2018741655; - } - - @Override - public String toString() { - return getClass().getSimpleName(); - } - - /** - * Read resolve to preserve the singleton object property. - */ - private Object readResolve() throws java.io.ObjectStreamException { - return INSTANCE; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/UnRegisterInfoMessageListener.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/UnRegisterInfoMessageListener.java deleted file mode 100644 index 4432160..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/UnRegisterInfoMessageListener.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.clusterframework.messages; - -import org.apache.flink.runtime.messages.RequiresLeaderSessionID; - -/** - * This message signals to the resource master to register the sender as an - * info message listener. - */ -public class UnRegisterInfoMessageListener implements RequiresLeaderSessionID, java.io.Serializable { - - private static final long serialVersionUID = 7808628311617273755L; - - /** The singleton instance */ - private static final UnRegisterInfoMessageListener INSTANCE = new UnRegisterInfoMessageListener(); - - /** - * Gets the singleton instance. - * @return The singleton instance. - */ - public static UnRegisterInfoMessageListener get() { - return INSTANCE; - } - - // ------------------------------------------------------------------------ - - /** Private constructor to prevent instantiation */ - private UnRegisterInfoMessageListener() {} - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - return obj != null && obj.getClass() == UnRegisterInfoMessageListener.class; - } - - @Override - public int hashCode() { - return 2018741654; - } - - @Override - public String toString() { - return getClass().getSimpleName(); - } - - /** - * Read resolve to preserve the singleton object property. - */ - private Object readResolve() throws java.io.ObjectStreamException { - return INSTANCE; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java deleted file mode 100644 index d1373ec..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.resourcemanager; - -import org.apache.flink.runtime.clusterframework.messages.InfoMessage; -import org.apache.flink.runtime.rpc.RpcGateway; - -/** - * A gateway to listen for info messages from {@link ResourceManager} - */ -public interface InfoMessageListenerRpcGateway extends RpcGateway { - - /** - * Notifies when resource manager need to notify listener about InfoMessage - * @param infoMessage - */ - void notifyInfoMessage(InfoMessage infoMessage); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 957c991..45269ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.clusterframework.ApplicationStatus; -import org.apache.flink.runtime.clusterframework.messages.InfoMessage; import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable; @@ -79,8 +78,6 @@ import java.util.Objects; import java.util.Optional; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; @@ -142,9 +139,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> /** The service to elect a ResourceManager leader. */ private LeaderElectionService leaderElectionService; - /** All registered listeners for status updates of the ResourceManager. */ - private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners; - /** * Represents asynchronous state clearing work. * @@ -192,7 +186,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> this.jobManagerRegistrations = new HashMap<>(4); this.jmResourceIdRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); - infoMessageListeners = new ConcurrentHashMap<>(8); } @@ -491,43 +484,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } /** - * Registers an info message listener. - * - * @param address address of infoMessage listener to register to this resource manager - */ - @Override - public void registerInfoMessageListener(final String address) { - if (infoMessageListeners.containsKey(address)) { - log.warn("Receive a duplicate registration from info message listener on ({})", address); - } else { - CompletableFuture<InfoMessageListenerRpcGateway> infoMessageListenerRpcGatewayFuture = getRpcService() - .connect(address, InfoMessageListenerRpcGateway.class); - - infoMessageListenerRpcGatewayFuture.whenCompleteAsync( - (InfoMessageListenerRpcGateway gateway, Throwable failure) -> { - if (failure != null) { - log.warn("Receive a registration from unreachable info message listener on ({})", address); - } else { - log.info("Receive a registration from info message listener on ({})", address); - infoMessageListeners.put(address, gateway); - } - }, - getMainThreadExecutor()); - } - } - - /** - * Unregisters an info message listener. - * - * @param address of the info message listener to unregister from this resource manager - * - */ - @Override - public void unRegisterInfoMessageListener(final String address) { - infoMessageListeners.remove(address); - } - - /** * Cleanup application and shut down cluster. * * @param finalStatus of the Flink application @@ -905,23 +861,6 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> } // ------------------------------------------------------------------------ - // Info messaging - // ------------------------------------------------------------------------ - - public void sendInfoMessage(final String message) { - getRpcService().execute(new Runnable() { - @Override - public void run() { - InfoMessage infoMessage = new InfoMessage(message); - for (InfoMessageListenerRpcGateway listenerRpcGateway : infoMessageListeners.values()) { - listenerRpcGateway - .notifyInfoMessage(infoMessage); - } - } - }); - } - - // ------------------------------------------------------------------------ // Error Handling // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index ea79650..2933915 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -132,21 +132,6 @@ public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManager AllocationID oldAllocationId); /** - * Registers an infoMessage listener - * - * @param infoMessageListenerAddress address of infoMessage listener to register to this resource manager - */ - void registerInfoMessageListener(String infoMessageListenerAddress); - - /** - * Unregisters an infoMessage listener - * - * @param infoMessageListenerAddress address of infoMessage listener to unregister from this resource manager - * - */ - void unRegisterInfoMessageListener(String infoMessageListenerAddress); - - /** * Deregister Flink from the underlying resource management system. * * @param finalStatus final status with which to deregister the Flink application diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java index ed8002b..d0a4512 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java @@ -236,16 +236,6 @@ public class TestingResourceManagerGateway implements ResourceManagerGateway { } @Override - public void registerInfoMessageListener(String infoMessageListenerAddress) { - - } - - @Override - public void unRegisterInfoMessageListener(String infoMessageListenerAddress) { - - } - - @Override public CompletableFuture<Acknowledge> deregisterApplication(ApplicationStatus finalStatus, String diagnostics) { return CompletableFuture.completedFuture(Acknowledge.get()); }