[3/6] flink git commit: [FLINK-8065][QS] Improve error message when client already shut down.

2017-11-17 Thread kkloudas
[FLINK-8065][QS] Improve error message when client already shut down.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6314e486
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6314e486
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6314e486

Branch: refs/heads/release-1.4
Commit: 6314e4861df3100e8edd666f00e062c128f6e09f
Parents: 96b350a
Author: kkloudas 
Authored: Wed Nov 15 15:38:36 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 11:19:08 2017 +0100

--
 .../main/java/org/apache/flink/queryablestate/network/Client.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6314e486/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index 13d34fb..e21145b 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -133,7 +133,7 @@ public class Client {
 
public CompletableFuture sendRequest(final InetSocketAddress 
serverAddress, final REQ request) {
if (shutDown.get()) {
-   return FutureUtils.getFailedFuture(new 
IllegalStateException("Shut down"));
+   return FutureUtils.getFailedFuture(new 
IllegalStateException(clientName + " is already shut down."));
}
 
EstablishedConnection connection = 
establishedConnections.get(serverAddress);



[5/6] flink git commit: [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds.

2017-11-17 Thread kkloudas
[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with 
unknown jobIds.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a68d752
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a68d752
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a68d752

Branch: refs/heads/release-1.4
Commit: 1a68d7527932b12bd2cb392c7c7781023756bf0c
Parents: 12b0c58
Author: kkloudas 
Authored: Thu Nov 16 17:45:49 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 11:20:55 2017 +0100

--
 .../itcases/AbstractQueryableStateTestBase.java | 32 +++-
 .../flink/runtime/jobmanager/JobManager.scala   |  4 +--
 .../runtime/jobmanager/JobManagerTest.java  |  5 +--
 3 files changed, 29 insertions(+), 12 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1a68d752/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index a789dbd..65e9bb5 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -276,10 +276,6 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
/**
 * Tests that duplicate query registrations fail the job at the 
JobManager.
-*
-* NOTE:  This test is only in the non-HA variant of the tests 
because
-* in the HA mode we use the actual JM code which does not recognize the
-* {@code NotifyWhenJobStatus} message.
 */
@Test
public void testDuplicateRegistrationFailsJob() throws Exception {
@@ -435,10 +431,10 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 
/**
 * Tests that the correct exception is thrown if the query
-* contains a wrong queryable state name.
+* contains a wrong jobId or wrong queryable state name.
 */
@Test
-   public void testWrongQueryableStateName() throws Exception {
+   public void testWrongJobIdAndWrongQueryableStateName() throws Exception 
{
// Config
final Deadline deadline = TEST_TIMEOUT.fromNow();
 
@@ -486,7 +482,27 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {

runningFuture.get(deadline.timeLeft().toMillis(), TimeUnit.MILLISECONDS);
assertEquals(JobStatus.RUNNING, jobStatus.state());
 
-   CompletableFuture>> 
future = client.getKvState(
+   final JobID wrongJobId = new JobID();
+
+   CompletableFuture>> 
unknownJobFuture = client.getKvState(
+   wrongJobId, 
// this is the wrong job id
+   "hankuna",
+   0,
+   BasicTypeInfo.INT_TYPE_INFO,
+   valueState);
+
+   try {
+   unknownJobFuture.get();
+   fail(); // by now the job must have failed.
+   } catch (ExecutionException e) {
+   Assert.assertTrue(e.getCause() instanceof 
RuntimeException);
+   
Assert.assertTrue(e.getCause().getMessage().contains(
+   "FlinkJobNotFoundException: 
Could not find Flink job (" + wrongJobId + ")"));
+   } catch (Exception ignored) {
+   fail("Unexpected type of exception.");
+   }
+
+   CompletableFuture>> 
unknownQSName = client.getKvState(
jobId,
"wrong-hankuna", // this is the wrong 
name.
0,
@@ -494,7 +510,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
 

[1/6] flink git commit: [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.

2017-11-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/release-1.4 42e24413b -> 3753ae251


[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is 
thrown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0324e34
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0324e34
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0324e34

Branch: refs/heads/release-1.4
Commit: d0324e34a06e7374179d1627a4a3653d07f1c614
Parents: 42e2441
Author: kkloudas 
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 10:37:18 2017 +0100

--
 .../network/AbstractServerHandler.java  |   2 +-
 .../client/proxy/KvStateClientProxyHandler.java |  11 +-
 .../itcases/AbstractQueryableStateTestBase.java | 230 ---
 3 files changed, 150 insertions(+), 93 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandlerhttp://git-wip-us.apache.org/repos/asf/flink/blob/d0324e34/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends 
AbstractServerHandler {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||

throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
-   
throwable.getCause() instanceof UnknownKvStateLocation ||
-   
throwable.getCause() instanceof ConnectException) {
+   
throwable.getCause() instanceof ConnectException
+   ) {
 

[6/6] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().

2017-11-17 Thread kkloudas
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState().


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3753ae25
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3753ae25
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3753ae25

Branch: refs/heads/release-1.4
Commit: 3753ae2517fbc940c05ea54e3eb0a960fecdf879
Parents: 1a68d75
Author: kkloudas 
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 11:21:18 2017 +0100

--
 .../flink/runtime/query/KvStateRegistry.java| 23 ++--
 1 file changed, 7 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3753ae25/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
new ConcurrentHashMap<>();
 
/** Registry listener to be notified on registration/unregistration. */
-   private final AtomicReference listener = new 
AtomicReference<>();
+   private final AtomicReference listenerRef = 
new AtomicReference<>();
 
/**
 * Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
 * @throws IllegalStateException If there is a registered listener
 */
public void registerListener(KvStateRegistryListener listener) {
-   if (!this.listener.compareAndSet(null, listener)) {
+   if (!listenerRef.compareAndSet(null, listener)) {
throw new IllegalStateException("Listener already 
registered.");
}
}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
 * Unregisters the listener with the registry.
 */
public void unregisterListener() {
-   listener.set(null);
+   listenerRef.set(null);
}
 
/**
-* Registers the KvState instance identified by the given 4-tuple of 
JobID,
-* JobVertexID, key group index, and registration name.
-*
-* @param kvStateId KvStateID to identify the KvState instance
-* @param kvState   KvState instance to register
-* @throws IllegalStateException If there is a KvState instance 
registered
-*   with the same ID.
-*/
-
-   /**
 * Registers the KvState instance and returns the assigned ID.
 *
 * @param jobIdJobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
KvStateID kvStateId = new KvStateID();
 
if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) 
{
-   KvStateRegistryListener listener = this.listener.get();
+   final KvStateRegistryListener listener = 
listenerRef.get();
if (listener != null) {
listener.notifyKvStateRegistered(
jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
 
return kvStateId;
} else {
-   throw new IllegalStateException(kvStateId + " is 
already registered.");
+   throw new IllegalStateException(
+   "State \"" + registrationName + " 
\"(id=" + kvStateId + ") appears registered although it should not.");
}
}
 
@@ -127,7 +118,7 @@ public class KvStateRegistry {
KvStateID kvStateId) {
 
if (registeredKvStates.remove(kvStateId) != null) {
-   KvStateRegistryListener listener = this.listener.get();
+   final KvStateRegistryListener listener = 
listenerRef.get();
if (listener != null) {
listener.notifyKvStateUnregistered(
jobId,



[4/6] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.

2017-11-17 Thread kkloudas
[FLINK-8055][QS] Deduplicate logging messages about QS start.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/12b0c58f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/12b0c58f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/12b0c58f

Branch: refs/heads/release-1.4
Commit: 12b0c58f6780376ac2da0f02c6e7eb8a24ab8a13
Parents: 6314e48
Author: kkloudas 
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 11:19:35 2017 +0100

--
 .../network/AbstractServerBase.java | 20 ++--
 .../flink/queryablestate/network/Client.java| 20 ++--
 .../server/KvStateServerImpl.java   |  5 -
 .../HAAbstractQueryableStateTestBase.java   |  2 +-
 .../network/AbstractServerTest.java |  2 +-
 .../network/KvStateServerHandlerTest.java   |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 --
 7 files changed, 27 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public abstract class AbstractServerBase {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServerBase.class);
+   protected final Logger log = LoggerFactory.getLogger(getClass());
 
/** AbstractServerBase config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && 
!attemptToBind(portIterator.next())) {}
 
if (serverAddress != null) {
-   LOG.info("Started the {} @ {}.", serverName, 
serverAddress);
+   log.info("Started {} @ {}.", serverName, serverAddress);
} else {
-   LOG.info("Unable to start the {}. All ports in provided 
range are occupied.", serverName);
-   throw new FlinkRuntimeException("Unable to start the " 
+ serverName + ". All ports in provided range are occupied.");
+   log.info("Unable to start {}. All ports in provided 
range are occupied.", serverName);
+   throw new FlinkRuntimeException("Unable to start " + 
serverName + ". All ports in provided range are occupied.");
}
}
 
@@ -203,7 +203,7 @@ public abstract class AbstractServerBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/12b0c58f/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client {
while (!queuedRequests.isEmpty()) {
final PendingRequest pending = 
queuedRequests.poll();
 
-   

[2/6] flink git commit: [FLINK-8062][QS] Make getKvState() with namespace private.

2017-11-17 Thread kkloudas
[FLINK-8062][QS] Make getKvState() with namespace private.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/96b350ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/96b350ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/96b350ad

Branch: refs/heads/release-1.4
Commit: 96b350ad91a1f248d0a3c616d1e59638013892be
Parents: d0324e3
Author: kkloudas 
Authored: Wed Nov 15 15:32:42 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 11:18:47 2017 +0100

--
 .../flink/queryablestate/client/QueryableStateClient.java | 3 +--
 .../itcases/AbstractQueryableStateTestBase.java   | 7 +--
 2 files changed, 2 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 304505a..03e02e1 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -186,8 +186,7 @@ public class QueryableStateClient {
 * @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
 * @return Future holding the immutable {@link State} object containing 
the result.
 */
-   @PublicEvolving
-   public  CompletableFuture getKvState(
+   private  CompletableFuture getKvState(
final JobID jobId,
final String queryableStateName,
final K key,

http://git-wip-us.apache.org/repos/asf/flink/blob/96b350ad/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index c1cbb61..a789dbd 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
 import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
jobId,
"wrong-hankuna", // this is the wrong 
name.
0,
-   VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
-   VoidNamespaceTypeInfo.INSTANCE,
valueState);
 
try {
@@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
jobId,
queryableState.getQueryableStateName(),
0,
-   VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
-   VoidNamespaceTypeInfo.INSTANCE,
valueState);
 
cluster.submitJobDetached(jobGraph);
@@ -1486,7 +1481,7 @@ public 

flink git commit: [FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is thrown.

2017-11-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master 81dc260dc -> a0838de79


[FLINK-8063][QS] QS client does not retry when an UnknownKvStateLocation is 
thrown.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0838de7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0838de7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0838de7

Branch: refs/heads/master
Commit: a0838de79ff73b0322f3ce255df54f5f33b2bf3b
Parents: 81dc260
Author: kkloudas 
Authored: Tue Nov 14 15:05:45 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 10:29:30 2017 +0100

--
 .../network/AbstractServerHandler.java  |   2 +-
 .../client/proxy/KvStateClientProxyHandler.java |  11 +-
 .../itcases/AbstractQueryableStateTestBase.java | 230 ---
 3 files changed, 150 insertions(+), 93 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
index 9e02291..7e71a11 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerHandler.java
@@ -262,7 +262,7 @@ public abstract class AbstractServerHandlerhttp://git-wip-us.apache.org/repos/asf/flink/blob/a0838de7/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
index 73ef7f3..af33701 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/main/java/org/apache/flink/queryablestate/client/proxy/KvStateClientProxyHandler.java
@@ -37,7 +37,6 @@ import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.query.KvStateClientProxy;
 import org.apache.flink.runtime.query.KvStateLocation;
 import org.apache.flink.runtime.query.KvStateMessage;
-import org.apache.flink.runtime.query.UnknownKvStateLocation;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.util.Preconditions;
 
@@ -48,7 +47,6 @@ import org.slf4j.LoggerFactory;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
-import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -133,12 +131,11 @@ public class KvStateClientProxyHandler extends 
AbstractServerHandler {
if (throwable != null) {
-   if (throwable 
instanceof CancellationException) {
-   
result.completeExceptionally(throwable);
-   } else if 
(throwable.getCause() instanceof UnknownKvStateIdException ||
+   if (
+   
throwable.getCause() instanceof UnknownKvStateIdException ||

throwable.getCause() instanceof UnknownKvStateKeyGroupLocationException ||
-   
throwable.getCause() instanceof UnknownKvStateLocation ||
-   
throwable.getCause() instanceof ConnectException) {
+   
throwable.getCause() instanceof ConnectException
+   ) {
 
  

[5/5] flink git commit: [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()

2017-11-17 Thread kkloudas
[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a4d86975
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a4d86975
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a4d86975

Branch: refs/heads/master
Commit: a4d86975967942054d1bd466641e9c835fb014ac
Parents: 2fe078f
Author: kkloudas 
Authored: Fri Nov 17 09:26:10 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 10:46:11 2017 +0100

--
 .../flink/runtime/query/KvStateRegistry.java| 23 ++--
 1 file changed, 7 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a4d86975/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
index af19d81..ed1f92e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/query/KvStateRegistry.java
@@ -45,7 +45,7 @@ public class KvStateRegistry {
new ConcurrentHashMap<>();
 
/** Registry listener to be notified on registration/unregistration. */
-   private final AtomicReference listener = new 
AtomicReference<>();
+   private final AtomicReference listenerRef = 
new AtomicReference<>();
 
/**
 * Registers a listener with the registry.
@@ -54,7 +54,7 @@ public class KvStateRegistry {
 * @throws IllegalStateException If there is a registered listener
 */
public void registerListener(KvStateRegistryListener listener) {
-   if (!this.listener.compareAndSet(null, listener)) {
+   if (!listenerRef.compareAndSet(null, listener)) {
throw new IllegalStateException("Listener already 
registered.");
}
}
@@ -63,20 +63,10 @@ public class KvStateRegistry {
 * Unregisters the listener with the registry.
 */
public void unregisterListener() {
-   listener.set(null);
+   listenerRef.set(null);
}
 
/**
-* Registers the KvState instance identified by the given 4-tuple of 
JobID,
-* JobVertexID, key group index, and registration name.
-*
-* @param kvStateId KvStateID to identify the KvState instance
-* @param kvState   KvState instance to register
-* @throws IllegalStateException If there is a KvState instance 
registered
-*   with the same ID.
-*/
-
-   /**
 * Registers the KvState instance and returns the assigned ID.
 *
 * @param jobIdJobId the KvState instance belongs to
@@ -96,7 +86,7 @@ public class KvStateRegistry {
KvStateID kvStateId = new KvStateID();
 
if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) 
{
-   KvStateRegistryListener listener = this.listener.get();
+   final KvStateRegistryListener listener = 
listenerRef.get();
if (listener != null) {
listener.notifyKvStateRegistered(
jobId,
@@ -108,7 +98,8 @@ public class KvStateRegistry {
 
return kvStateId;
} else {
-   throw new IllegalStateException(kvStateId + " is 
already registered.");
+   throw new IllegalStateException(
+   "State \"" + registrationName + " 
\"(id=" + kvStateId + ") appears registered although it should not.");
}
}
 
@@ -127,7 +118,7 @@ public class KvStateRegistry {
KvStateID kvStateId) {
 
if (registeredKvStates.remove(kvStateId) != null) {
-   KvStateRegistryListener listener = this.listener.get();
+   final KvStateRegistryListener listener = 
listenerRef.get();
if (listener != null) {
listener.notifyKvStateUnregistered(
jobId,



[1/5] flink git commit: [FLINK-8062][QS] Make getKvState() with namespace private.

2017-11-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master a0838de79 -> a4d869759


[FLINK-8062][QS] Make getKvState() with namespace private.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ff7e3cf6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ff7e3cf6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ff7e3cf6

Branch: refs/heads/master
Commit: ff7e3cf6749a6b6bc898fde871c36661c8629c23
Parents: a0838de
Author: kkloudas 
Authored: Wed Nov 15 15:32:42 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 10:46:08 2017 +0100

--
 .../flink/queryablestate/client/QueryableStateClient.java | 3 +--
 .../itcases/AbstractQueryableStateTestBase.java   | 7 +--
 2 files changed, 2 insertions(+), 8 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 304505a..03e02e1 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -186,8 +186,7 @@ public class QueryableStateClient {
 * @param stateDescriptor   The {@link 
StateDescriptor} of the state we want to query.
 * @return Future holding the immutable {@link State} object containing 
the result.
 */
-   @PublicEvolving
-   public  CompletableFuture getKvState(
+   private  CompletableFuture getKvState(
final JobID jobId,
final String queryableStateName,
final K key,

http://git-wip-us.apache.org/repos/asf/flink/blob/ff7e3cf6/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
index c1cbb61..a789dbd 100644
--- 
a/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-runtime/src/test/java/org/apache/flink/queryablestate/itcases/AbstractQueryableStateTestBase.java
@@ -47,7 +47,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.queryablestate.client.QueryableStateClient;
 import org.apache.flink.queryablestate.client.VoidNamespace;
 import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
-import org.apache.flink.queryablestate.client.VoidNamespaceTypeInfo;
 import 
org.apache.flink.queryablestate.exceptions.UnknownKeyOrNamespaceException;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
@@ -491,9 +490,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
jobId,
"wrong-hankuna", // this is the wrong 
name.
0,
-   VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
-   VoidNamespaceTypeInfo.INSTANCE,
valueState);
 
try {
@@ -572,9 +569,7 @@ public abstract class AbstractQueryableStateTestBase 
extends TestLogger {
jobId,
queryableState.getQueryableStateName(),
0,
-   VoidNamespace.INSTANCE,
BasicTypeInfo.INT_TYPE_INFO,
-   VoidNamespaceTypeInfo.INSTANCE,
valueState);
 
 

[2/5] flink git commit: [FLINK-8055][QS] Deduplicate logging messages about QS start.

2017-11-17 Thread kkloudas
[FLINK-8055][QS] Deduplicate logging messages about QS start.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5e059e96
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5e059e96
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5e059e96

Branch: refs/heads/master
Commit: 5e059e968633c4292734ebed209fa1b3c30529a1
Parents: 75c1454
Author: kkloudas 
Authored: Thu Nov 16 17:02:16 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 10:46:09 2017 +0100

--
 .../network/AbstractServerBase.java | 20 ++--
 .../flink/queryablestate/network/Client.java| 20 ++--
 .../server/KvStateServerImpl.java   |  5 -
 .../HAAbstractQueryableStateTestBase.java   |  2 +-
 .../network/AbstractServerTest.java |  2 +-
 .../network/KvStateServerHandlerTest.java   |  2 +-
 .../runtime/io/network/NetworkEnvironment.java  |  2 --
 7 files changed, 27 insertions(+), 26 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
index 07ca26d..82a05f2 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/AbstractServerBase.java
@@ -60,7 +60,7 @@ import java.util.concurrent.TimeUnit;
 @Internal
 public abstract class AbstractServerBase {
 
-   private static final Logger LOG = 
LoggerFactory.getLogger(AbstractServerBase.class);
+   protected final Logger log = LoggerFactory.getLogger(getClass());
 
/** AbstractServerBase config: low water mark. */
private static final int LOW_WATER_MARK = 8 * 1024;
@@ -180,16 +180,16 @@ public abstract class AbstractServerBase portIterator = bindPortRange.iterator();
while (portIterator.hasNext() && 
!attemptToBind(portIterator.next())) {}
 
if (serverAddress != null) {
-   LOG.info("Started the {} @ {}.", serverName, 
serverAddress);
+   log.info("Started {} @ {}.", serverName, serverAddress);
} else {
-   LOG.info("Unable to start the {}. All ports in provided 
range are occupied.", serverName);
-   throw new FlinkRuntimeException("Unable to start the " 
+ serverName + ". All ports in provided range are occupied.");
+   log.info("Unable to start {}. All ports in provided 
range are occupied.", serverName);
+   throw new FlinkRuntimeException("Unable to start " + 
serverName + ". All ports in provided range are occupied.");
}
}
 
@@ -203,7 +203,7 @@ public abstract class AbstractServerBasehttp://git-wip-us.apache.org/repos/asf/flink/blob/5e059e96/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
index e21145b..12286fa 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/network/Client.java
@@ -19,6 +19,7 @@
 package org.apache.flink.queryablestate.network;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.queryablestate.FutureUtils;
 import org.apache.flink.queryablestate.network.messages.MessageBody;
 import org.apache.flink.queryablestate.network.messages.MessageSerializer;
@@ -282,12 +283,14 @@ public class Client {
while (!queuedRequests.isEmpty()) {
final PendingRequest pending = 
queuedRequests.poll();
 
-   
established.sendRequest(pending.request)
-   

flink git commit: [FLINK-8061][QS] Remove trailing * in QSClient javadocs.

2017-11-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master a4d869759 -> 3edbb7bce


[FLINK-8061][QS] Remove trailing * in QSClient javadocs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3edbb7bc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3edbb7bc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3edbb7bc

Branch: refs/heads/master
Commit: 3edbb7bce5b30386a67b1b01ef1591a681601219
Parents: a4d8697
Author: Vetriselvan1187 
Authored: Mon Nov 13 22:24:43 2017 +0530
Committer: kkloudas 
Committed: Fri Nov 17 15:28:46 2017 +0100

--
 .../apache/flink/queryablestate/client/QueryableStateClient.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3edbb7bc/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 03e02e1..7abf6bc 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -132,7 +132,7 @@ public class QueryableStateClient {
}
 
/**
-* Returns a future holding the request result.  *
+* Returns a future holding the request result.
 * @param jobId JobID of the job the queryable 
state belongs to.
 * @param queryableStateNameName under which the state is 
queryable.
 * @param key   The key we are interested 
in.
@@ -155,7 +155,7 @@ public class QueryableStateClient {
}
 
/**
-* Returns a future holding the request result.  *
+* Returns a future holding the request result.
 * @param jobId JobID of the job the queryable 
state belongs to.
 * @param queryableStateNameName under which the state is 
queryable.
 * @param key   The key we are interested 
in.



flink git commit: [FLINK-8061][QS] Remove trailing * in QSClient javadocs.

2017-11-17 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/release-1.4 3753ae251 -> e784f3a18


[FLINK-8061][QS] Remove trailing * in QSClient javadocs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e784f3a1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e784f3a1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e784f3a1

Branch: refs/heads/release-1.4
Commit: e784f3a184ecc35ea26fdfa6bcb4fa74520fa9df
Parents: 3753ae2
Author: Vetriselvan1187 
Authored: Mon Nov 13 17:54:43 2017 +0100
Committer: kkloudas 
Committed: Fri Nov 17 15:31:15 2017 +0100

--
 .../apache/flink/queryablestate/client/QueryableStateClient.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e784f3a1/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
--
diff --git 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
index 03e02e1..7abf6bc 100644
--- 
a/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
+++ 
b/flink-queryable-state/flink-queryable-state-client-java/src/main/java/org/apache/flink/queryablestate/client/QueryableStateClient.java
@@ -132,7 +132,7 @@ public class QueryableStateClient {
}
 
/**
-* Returns a future holding the request result.  *
+* Returns a future holding the request result.
 * @param jobId JobID of the job the queryable 
state belongs to.
 * @param queryableStateNameName under which the state is 
queryable.
 * @param key   The key we are interested 
in.
@@ -155,7 +155,7 @@ public class QueryableStateClient {
}
 
/**
-* Returns a future holding the request result.  *
+* Returns a future holding the request result.
 * @param jobId JobID of the job the queryable 
state belongs to.
 * @param queryableStateNameName under which the state is 
queryable.
 * @param key   The key we are interested 
in.



[2/2] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore

2017-11-17 Thread aljoscha
[FLINK-7265] [core] Introduce FileSystemKind to differentiate between 
FileSystem and ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f29f8057
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f29f8057
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f29f8057

Branch: refs/heads/master
Commit: f29f80575dac1c7e59dd7074118953b8be26520f
Parents: 3edbb7b
Author: Stephan Ewen 
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Aljoscha Krettek 
Committed: Fri Nov 17 16:48:29 2017 +0100

--
 .../org/apache/flink/core/fs/FileSystem.java|   5 +
 .../apache/flink/core/fs/FileSystemKind.java|  40 
 .../core/fs/SafetyNetWrapperFileSystem.java |   5 +
 .../flink/core/fs/local/LocalFileSystem.java|  10 +-
 .../core/fs/local/LocalFileSystemTest.java  |   7 ++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  47 +
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |   6 ++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++
 8 files changed, 219 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d66a893..982e496 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -633,6 +633,11 @@ public abstract class FileSystem {
 */
public abstract boolean isDistributedFS();
 
+   /**
+* Gets a description of the characteristics of this file system.
+*/
+   public abstract FileSystemKind getKind();
+
// 

//  output directory initialization
// 


http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link 
FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+   /**
+* An actual file system, with files and directories.
+*/
+   FILE_SYSTEM,
+
+   /**
+* An Object store. Files correspond to objects.
+* There are not really directories, but a directory-like structure may 
be mimicked
+* by hierarchical naming of files.
+*/
+   OBJECT_STORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f29f8057/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index a1167dd..e7f43a4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem 
implements WrappingPr
}
 
@Override
+   public 

[1/2] flink git commit: [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores

2017-11-17 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 3edbb7bce -> b00f1b326


[FLINK-7266] [core] Prevent attempt for parent directory deletion for object 
stores

This closes #4397


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b00f1b32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b00f1b32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b00f1b32

Branch: refs/heads/master
Commit: b00f1b326c1ab4221a555200a4d5798e1565b821
Parents: f29f805
Author: Stephan Ewen 
Authored: Tue Jul 25 17:26:38 2017 +0200
Committer: Aljoscha Krettek 
Committed: Fri Nov 17 16:48:29 2017 +0100

--
 .../flink/runtime/state/filesystem/FileStateHandle.java   | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b00f1b32/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index bdf3f42..7655f0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.FileUtils;
@@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle {
 */
@Override
public void discardState() throws Exception {
-
FileSystem fs = getFileSystem();
 
fs.delete(filePath, false);
 
-   try {
-   FileUtils.deletePathIfEmpty(fs, filePath.getParent());
-   } catch (Exception ignored) {}
+   if (fs.getKind() == FileSystemKind.FILE_SYSTEM) {
+   try {
+   FileUtils.deletePathIfEmpty(fs, 
filePath.getParent());
+   } catch (Exception ignored) {}
+   }
}
 
/**



[1/2] flink git commit: [FLINK-7266] [core] Prevent attempt for parent directory deletion for object stores

2017-11-17 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/release-1.4 e784f3a18 -> 666b1b2e6


[FLINK-7266] [core] Prevent attempt for parent directory deletion for object 
stores

This closes #4397


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/666b1b2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/666b1b2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/666b1b2e

Branch: refs/heads/release-1.4
Commit: 666b1b2e62463ae4985d237535d56c9e0ab9dba9
Parents: a0dbe18
Author: Stephan Ewen 
Authored: Tue Jul 25 17:26:38 2017 +0200
Committer: Aljoscha Krettek 
Committed: Fri Nov 17 17:22:24 2017 +0100

--
 .../flink/runtime/state/filesystem/FileStateHandle.java   | 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/666b1b2e/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
index bdf3f42..7655f0b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FileStateHandle.java
@@ -20,6 +20,7 @@ package org.apache.flink.runtime.state.filesystem;
 
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.state.StreamStateHandle;
 import org.apache.flink.util.FileUtils;
@@ -77,14 +78,15 @@ public class FileStateHandle implements StreamStateHandle {
 */
@Override
public void discardState() throws Exception {
-
FileSystem fs = getFileSystem();
 
fs.delete(filePath, false);
 
-   try {
-   FileUtils.deletePathIfEmpty(fs, filePath.getParent());
-   } catch (Exception ignored) {}
+   if (fs.getKind() == FileSystemKind.FILE_SYSTEM) {
+   try {
+   FileUtils.deletePathIfEmpty(fs, 
filePath.getParent());
+   } catch (Exception ignored) {}
+   }
}
 
/**



[2/2] flink git commit: [FLINK-7265] [core] Introduce FileSystemKind to differentiate between FileSystem and ObjectStore

2017-11-17 Thread aljoscha
[FLINK-7265] [core] Introduce FileSystemKind to differentiate between 
FileSystem and ObjectStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0dbe182
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0dbe182
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0dbe182

Branch: refs/heads/release-1.4
Commit: a0dbe182fa677a87f601cbedc4115e63fff9fe4f
Parents: e784f3a
Author: Stephan Ewen 
Authored: Tue Jul 25 17:19:25 2017 +0200
Committer: Aljoscha Krettek 
Committed: Fri Nov 17 17:22:24 2017 +0100

--
 .../org/apache/flink/core/fs/FileSystem.java|   5 +
 .../apache/flink/core/fs/FileSystemKind.java|  40 
 .../core/fs/SafetyNetWrapperFileSystem.java |   5 +
 .../flink/core/fs/local/LocalFileSystem.java|  10 +-
 .../core/fs/local/LocalFileSystemTest.java  |   7 ++
 .../flink/runtime/fs/hdfs/HadoopFileSystem.java |  47 +
 .../flink/runtime/fs/maprfs/MapRFileSystem.java |   6 ++
 .../flink/runtime/fs/hdfs/HdfsKindTest.java | 101 +++
 8 files changed, 219 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a0dbe182/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
--
diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
index d66a893..982e496 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystem.java
@@ -633,6 +633,11 @@ public abstract class FileSystem {
 */
public abstract boolean isDistributedFS();
 
+   /**
+* Gets a description of the characteristics of this file system.
+*/
+   public abstract FileSystemKind getKind();
+
// 

//  output directory initialization
// 


http://git-wip-us.apache.org/repos/asf/flink/blob/a0dbe182/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
new file mode 100644
index 000..52f58ac
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemKind.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.core.fs;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/**
+ * An enumeration defining the kind and characteristics of a {@link 
FileSystem}.
+ */
+@PublicEvolving
+public enum FileSystemKind {
+
+   /**
+* An actual file system, with files and directories.
+*/
+   FILE_SYSTEM,
+
+   /**
+* An Object store. Files correspond to objects.
+* There are not really directories, but a directory-like structure may 
be mimicked
+* by hierarchical naming of files.
+*/
+   OBJECT_STORE
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a0dbe182/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
index a1167dd..e7f43a4 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/fs/SafetyNetWrapperFileSystem.java
@@ -141,6 +141,11 @@ public class SafetyNetWrapperFileSystem extends FileSystem 
implements WrappingPr
}
 
@Override
+   public