This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d67f7db  [FLINK-11845][runtime] Drop legacy InfoMessage
d67f7db is described below

commit d67f7dbaa096f7b2915675a1012fe31003441e7c
Author: ZiLi Chen <wander4...@gmail.com>
AuthorDate: Thu Mar 14 18:02:59 2019 +0800

    [FLINK-11845][runtime] Drop legacy InfoMessage
---
 .../clusterframework/messages/InfoMessage.java     | 58 ------------------
 .../messages/RegisterInfoMessageListener.java      | 70 ----------------------
 .../messages/UnRegisterInfoMessageListener.java    | 70 ----------------------
 .../InfoMessageListenerRpcGateway.java             | 34 -----------
 .../runtime/resourcemanager/ResourceManager.java   | 61 -------------------
 .../resourcemanager/ResourceManagerGateway.java    | 15 -----
 .../utils/TestingResourceManagerGateway.java       | 10 ----
 7 files changed, 318 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/InfoMessage.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/InfoMessage.java
deleted file mode 100644
index 81670d7..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/InfoMessage.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import java.util.Date;
-
-import static java.util.Objects.requireNonNull;
-
-/**
- * A simple informational message sent by the resource master to the client.
- */
-public class InfoMessage implements java.io.Serializable {
-
-       private static final long serialVersionUID = 5534993035539629765L;
-       
-       private final String message;
-       
-       private final Date date;
-       
-       public InfoMessage(String message) {
-               this.message = message;
-               this.date = new Date();
-       }
-       
-       public InfoMessage(String message, Date date) {
-               this.message = requireNonNull(message);
-               this.date = requireNonNull(date);
-       }
-       
-       public String message() {
-               return message;
-       }
-       
-       public Date date() {
-               return date;
-       }
-       
-       @Override
-       public String toString() {
-               return "InfoMessage { message='" + message + "', date=" + date 
+ " }";
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListener.java
deleted file mode 100644
index d39ff9b..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/RegisterInfoMessageListener.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * This message signals to the resource master to register the sender as an
- * info message listener. 
- */
-public class RegisterInfoMessageListener implements RequiresLeaderSessionID, 
java.io.Serializable {
-
-       private static final long serialVersionUID = 7808628311617273755L;
-
-       /** The singleton instance */
-       private static final RegisterInfoMessageListener INSTANCE = new 
RegisterInfoMessageListener();
-
-       /**
-        * Gets the singleton instance.
-        * @return The singleton instance.
-        */
-       public static RegisterInfoMessageListener getInstance() {
-               return INSTANCE;
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       /** Private constructor to prevent instantiation */
-       private RegisterInfoMessageListener() {}
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj != null && obj.getClass() == 
RegisterInfoMessageListener.class;
-       }
-
-       @Override
-       public int hashCode() {
-               return 2018741655;
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
-
-       /**
-        * Read resolve to preserve the singleton object property.
-        */
-       private Object readResolve() throws java.io.ObjectStreamException {
-               return INSTANCE;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/UnRegisterInfoMessageListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/UnRegisterInfoMessageListener.java
deleted file mode 100644
index 4432160..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/messages/UnRegisterInfoMessageListener.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.clusterframework.messages;
-
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-
-/**
- * This message signals to the resource master to register the sender as an
- * info message listener. 
- */
-public class UnRegisterInfoMessageListener implements RequiresLeaderSessionID, 
java.io.Serializable {
-
-       private static final long serialVersionUID = 7808628311617273755L;
-
-       /** The singleton instance */
-       private static final UnRegisterInfoMessageListener INSTANCE = new 
UnRegisterInfoMessageListener();
-
-       /**
-        * Gets the singleton instance.
-        * @return The singleton instance.
-        */
-       public static UnRegisterInfoMessageListener get() {
-               return INSTANCE;
-       }
-
-       // 
------------------------------------------------------------------------
-
-       /** Private constructor to prevent instantiation */
-       private UnRegisterInfoMessageListener() {}
-
-       // 
------------------------------------------------------------------------
-
-       @Override
-       public boolean equals(Object obj) {
-               return obj != null && obj.getClass() == 
UnRegisterInfoMessageListener.class;
-       }
-
-       @Override
-       public int hashCode() {
-               return 2018741654;
-       }
-
-       @Override
-       public String toString() {
-               return getClass().getSimpleName();
-       }
-
-       /**
-        * Read resolve to preserve the singleton object property.
-        */
-       private Object readResolve() throws java.io.ObjectStreamException {
-               return INSTANCE;
-       }
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
deleted file mode 100644
index d1373ec..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/InfoMessageListenerRpcGateway.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.resourcemanager;
-
-import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
-import org.apache.flink.runtime.rpc.RpcGateway;
-
-/**
- * A gateway to listen for info messages from {@link ResourceManager}
- */
-public interface InfoMessageListenerRpcGateway extends RpcGateway {
-
-       /**
-        * Notifies when resource manager need to notify listener about 
InfoMessage
-        * @param infoMessage
-        */
-       void notifyInfoMessage(InfoMessage infoMessage);
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 957c991..45269ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.blob.TransientBlobKey;
 import org.apache.flink.runtime.clusterframework.ApplicationStatus;
-import org.apache.flink.runtime.clusterframework.messages.InfoMessage;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
@@ -79,8 +78,6 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
@@ -142,9 +139,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        /** The service to elect a ResourceManager leader. */
        private LeaderElectionService leaderElectionService;
 
-       /** All registered listeners for status updates of the ResourceManager. 
*/
-       private ConcurrentMap<String, InfoMessageListenerRpcGateway> 
infoMessageListeners;
-
        /**
         * Represents asynchronous state clearing work.
         *
@@ -192,7 +186,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                this.jobManagerRegistrations = new HashMap<>(4);
                this.jmResourceIdRegistrations = new HashMap<>(4);
                this.taskExecutors = new HashMap<>(8);
-               infoMessageListeners = new ConcurrentHashMap<>(8);
        }
 
 
@@ -491,43 +484,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        }
 
        /**
-        * Registers an info message listener.
-        *
-        * @param address address of infoMessage listener to register to this 
resource manager
-        */
-       @Override
-       public void registerInfoMessageListener(final String address) {
-               if (infoMessageListeners.containsKey(address)) {
-                       log.warn("Receive a duplicate registration from info 
message listener on ({})", address);
-               } else {
-                       CompletableFuture<InfoMessageListenerRpcGateway> 
infoMessageListenerRpcGatewayFuture = getRpcService()
-                               .connect(address, 
InfoMessageListenerRpcGateway.class);
-
-                       infoMessageListenerRpcGatewayFuture.whenCompleteAsync(
-                               (InfoMessageListenerRpcGateway gateway, 
Throwable failure) -> {
-                                       if (failure != null) {
-                                               log.warn("Receive a 
registration from unreachable info message listener on ({})", address);
-                                       } else {
-                                               log.info("Receive a 
registration from info message listener on ({})", address);
-                                               
infoMessageListeners.put(address, gateway);
-                                       }
-                               },
-                               getMainThreadExecutor());
-               }
-       }
-
-       /**
-        * Unregisters an info message listener.
-        *
-        * @param address of the  info message listener to unregister from this 
resource manager
-        *
-        */
-       @Override
-       public void unRegisterInfoMessageListener(final String address) {
-               infoMessageListeners.remove(address);
-       }
-
-       /**
         * Cleanup application and shut down cluster.
         *
         * @param finalStatus of the Flink application
@@ -905,23 +861,6 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        }
 
        // 
------------------------------------------------------------------------
-       //  Info messaging
-       // 
------------------------------------------------------------------------
-
-       public void sendInfoMessage(final String message) {
-               getRpcService().execute(new Runnable() {
-                       @Override
-                       public void run() {
-                               InfoMessage infoMessage = new 
InfoMessage(message);
-                               for (InfoMessageListenerRpcGateway 
listenerRpcGateway : infoMessageListeners.values()) {
-                                       listenerRpcGateway
-                                               .notifyInfoMessage(infoMessage);
-                               }
-                       }
-               });
-       }
-
-       // 
------------------------------------------------------------------------
        //  Error Handling
        // 
------------------------------------------------------------------------
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
index ea79650..2933915 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java
@@ -132,21 +132,6 @@ public interface ResourceManagerGateway extends 
FencedRpcGateway<ResourceManager
                AllocationID oldAllocationId);
 
        /**
-        * Registers an infoMessage listener
-        *
-        * @param infoMessageListenerAddress address of infoMessage listener to 
register to this resource manager
-        */
-       void registerInfoMessageListener(String infoMessageListenerAddress);
-
-       /**
-        * Unregisters an infoMessage listener
-        *
-        * @param infoMessageListenerAddress address of infoMessage listener to 
unregister from this resource manager
-        *
-        */
-       void unRegisterInfoMessageListener(String infoMessageListenerAddress);
-
-       /**
         * Deregister Flink from the underlying resource management system.
         *
         * @param finalStatus final status with which to deregister the Flink 
application
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
index ed8002b..d0a4512 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/utils/TestingResourceManagerGateway.java
@@ -236,16 +236,6 @@ public class TestingResourceManagerGateway implements 
ResourceManagerGateway {
        }
 
        @Override
-       public void registerInfoMessageListener(String 
infoMessageListenerAddress) {
-
-       }
-
-       @Override
-       public void unRegisterInfoMessageListener(String 
infoMessageListenerAddress) {
-
-       }
-
-       @Override
        public CompletableFuture<Acknowledge> 
deregisterApplication(ApplicationStatus finalStatus, String diagnostics) {
                return CompletableFuture.completedFuture(Acknowledge.get());
        }

Reply via email to