[FLINK-8055][QS] Deduplicate logging messages about QS start.

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96

Branch: refs/heads/master
Commit: 5e059e968633c4292734ebed209fa1b3c30529a1
Parents: 75c1454
Author: kkloudas <kklou...@gmail.com>
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas <kklou...@gmail.com>
Committed: Fri Nov 17 10:46:09 2017 +0100

----------------------------------------------------------------------
 .../network/AbstractServerBase.java             | 20 ++++++++++----------
 .../flink/queryablestate/network/Client.java    | 20 ++++++++++++++------
 .../server/KvStateServerImpl.java               |  5 -----
 .../HAAbstractQueryableStateTestBase.java       |  2 +-
 .../network/AbstractServerTest.java             |  2 +-
 .../network/KvStateServerHandlerTest.java       |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 --
 7 files changed, 27 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public abstract class AbstractServerBase<REQ extends MessageBody, RESP extends 
MessageBody> {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServerBase.class);
+       protected final Logger log = LoggerFactory.getLogger(getClass());
 
        /** AbstractServerBase config: low water mark. */
        private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
         */
        public void start() throws Throwable {
                Preconditions.checkState(serverAddress == null,
-                               "The " + serverName + " already running @ " + 
serverAddress + '.');
+                               serverName + " is already running @ " + 
serverAddress + '.');
 
                Iterator<Integer> portIterator = bindPortRange.iterator();
                while (portIterator.hasNext() && 
!attemptToBind(portIterator.next())) {}
 
                if (serverAddress != null) {
-                       LOG.info("Started the {} @ {}.", serverName, 
serverAddress);
+                       log.info("Started {} @ {}.", serverName, serverAddress);
                } else {
-                       LOG.info("Unable to start the {}. All ports in provided 
range are occupied.", serverName);
-                       throw new FlinkRuntimeException("Unable to start the " 
+ serverName + ". All ports in provided range are occupied.");
+                       log.info("Unable to start {}. All ports in provided 
range are occupied.", serverName);
+                       throw new FlinkRuntimeException("Unable to start " + 
serverName + ". All ports in provided range are occupied.");
                }
        }
 
@@ -203,7 +203,7 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
         * @throws Exception If something goes wrong during the bind operation.
         */
        private boolean attemptToBind(final int port) throws Throwable {
-               LOG.debug("Attempting to start server {} on port {}.", 
serverName, port);
+               log.debug("Attempting to start {} on port {}.", serverName, 
port);
 
                this.queryExecutor = createQueryExecutor();
                this.handler = initializeHandler();
@@ -250,7 +250,7 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
 
                        throw future.cause();
                } catch (BindException e) {
-                       LOG.debug("Failed to start server {} on port {}: {}.", 
serverName, port, e.getMessage());
+                       log.debug("Failed to start {} on port {}: {}.", 
serverName, port, e.getMessage());
                        shutdown();
                }
                // any other type of exception we let it bubble up.
@@ -261,7 +261,7 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
         * Shuts down the server and all related thread pools.
         */
        public void shutdown() {
-               LOG.info("Shutting down server {} @ {}", serverName, 
serverAddress);
+               log.info("Shutting down {} @ {}", serverName, serverAddress);
 
                if (handler != null) {
                        handler.shutdown();
@@ -311,7 +311,7 @@ public abstract class AbstractServerBase<REQ extends 
MessageBody, RESP extends M
        }
 
        @VisibleForTesting
-       public boolean isExecutorShutdown() {
-               return queryExecutor.isShutdown();
+       public boolean isEventGroupShutdown() {
+               return bootstrap == null || bootstrap.group().isTerminated();
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client<REQ extends MessageBody, RESP extends 
MessageBody> {
                                        while (!queuedRequests.isEmpty()) {
                                                final PendingRequest pending = 
queuedRequests.poll();
 
-                                               
established.sendRequest(pending.request)
-                                                               
.thenAccept(resp -> pending.complete(resp))
-                                                               
.exceptionally(throwable -> {
-                                                                       
pending.completeExceptionally(throwable);
-                                                                       return 
null;
-                                               });
+                                               
established.sendRequest(pending.request).whenComplete(
+                                                               (response, 
throwable) -> {
+                                                                       if 
(throwable != null) {
+                                                                               
pending.completeExceptionally(throwable);
+                                                                       } else {
+                                                                               
pending.complete(response);
+                                                                       }
+                                                               });
                                        }
 
                                        // Publish the channel for the general 
public
@@ -533,4 +536,9 @@ public class Client<REQ extends MessageBody, RESP extends 
MessageBody> {
                        }
                }
        }
+
+       @VisibleForTesting
+       public boolean isEventGroupShutdown() {
+               return bootstrap == null || bootstrap.group().isTerminated();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
index fe07687..3a37a3a 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/server/KvStateServerImpl.java
@@ -29,9 +29,6 @@ import org.apache.flink.runtime.query.KvStateRegistry;
 import org.apache.flink.runtime.query.KvStateServer;
 import org.apache.flink.util.Preconditions;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
 import java.util.Iterator;
@@ -42,8 +39,6 @@ import java.util.Iterator;
 @Internal
 public class KvStateServerImpl extends 
AbstractServerBase<KvStateInternalRequest, KvStateResponse> implements 
KvStateServer {
 
-       private static final Logger LOG = 
LoggerFactory.getLogger(KvStateServerImpl.class);
-
        /** The {@link KvStateRegistry} to query for state instances. */
        private final KvStateRegistry kvStateRegistry;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
index fc4b2bc..79809b3 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/HAAbstractQueryableStateTestBase.java
@@ -65,7 +65,7 @@ public abstract class HAAbstractQueryableStateTestBase 
extends AbstractQueryable
                        config.setString(HighAvailabilityOptions.HA_MODE, 
"zookeeper");
 
                        cluster = new TestingCluster(config, false);
-                       cluster.start();
+                       cluster.start(true);
 
                        client = new QueryableStateClient("localhost", 
proxyPortRangeStart);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
index 2775cd4..3d2ed40 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/AbstractServerTest.java
@@ -58,7 +58,7 @@ public class AbstractServerTest {
 
                // the expected exception along with the adequate message
                expectedEx.expect(FlinkRuntimeException.class);
-               expectedEx.expectMessage("Unable to start the Test Server 2. 
All ports in provided range are occupied.");
+               expectedEx.expectMessage("Unable to start Test Server 2. All 
ports in provided range are occupied.");
 
                TestServer server1 = null;
                TestServer server2 = null;

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
index 041544d..7b301ed 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/network/KvStateServerHandlerTest.java
@@ -391,7 +391,7 @@ public class KvStateServerHandlerTest extends TestLogger {
 
                localTestServer.start();
                localTestServer.shutdown();
-               assertTrue(localTestServer.isExecutorShutdown());
+               assertTrue(localTestServer.getQueryExecutor().isTerminated());
 
                MessageSerializer<KvStateInternalRequest, KvStateResponse> 
serializer =
                                new MessageSerializer<>(new 
KvStateInternalRequest.KvStateInternalRequestDeserializer(), new 
KvStateResponse.KvStateResponseDeserializer());

http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 4fffacd..71d0386 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -310,7 +310,6 @@ public class NetworkEnvironment {
                        if (kvStateServer != null) {
                                try {
                                        kvStateServer.start();
-                                       LOG.info("Started the Queryable State 
Data Server @ {}", kvStateServer.getServerAddress());
                                } catch (Throwable ie) {
                                        kvStateServer.shutdown();
                                        kvStateServer = null;
@@ -321,7 +320,6 @@ public class NetworkEnvironment {
                        if (kvStateProxy != null) {
                                try {
                                        kvStateProxy.start();
-                                       LOG.info("Started the Queryable State 
Client Proxy @ {}", kvStateProxy.getServerAddress());
                                } catch (Throwable ie) {
                                        kvStateProxy.shutdown();
                                        kvStateProxy = null;

Reply via email to