flink git commit: [FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture in TaskExecutor

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master a6c8953eb -> d2a8e3741


[FLINK-7332] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in TaskExecutor

This closes #4448.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d2a8e374
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d2a8e374
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d2a8e374

Branch: refs/heads/master
Commit: d2a8e37415eb34ca9cb8b2d8c22a33aa99b494a6
Parents: a6c8953
Author: Till Rohrmann 
Authored: Tue Aug 1 10:46:40 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 23:13:56 2017 +0200

--
 .../runtime/taskexecutor/TaskExecutor.java  | 100 +--
 1 file changed, 46 insertions(+), 54 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d2a8e374/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 4c4b0a7..aa4d6d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -27,9 +27,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -771,55 +768,51 @@ public class TaskExecutor extends 
RpcEndpoint {
reservedSlots.add(offer);
}
 
-   Future acceptedSlotsFuture 
= jobMasterGateway.offerSlots(
-   getResourceID(),
-   reservedSlots,
-   leaderId,
-   taskManagerConfiguration.getTimeout());
-
-   Future acceptedSlotsAcceptFuture = 
acceptedSlotsFuture.thenAcceptAsync(new AcceptFunction() {
-   @Override
-   public void accept(Iterable 
acceptedSlots) {
-   // check if the response is 
still valid
-   if 
(isJobManagerConnectionValid(jobId, leaderId)) {
-   // mark accepted slots 
active
-   for (SlotOffer 
acceptedSlot : acceptedSlots) {
-   
reservedSlots.remove(acceptedSlot);
-   }
-
-   final Exception e = new 
Exception("The slot was rejected by the JobManager.");
+   CompletableFuture 
acceptedSlotsFuture = FutureUtils.toJava(
+   jobMasterGateway.offerSlots(
+   getResourceID(),
+   reservedSlots,
+   leaderId,
+   
taskManagerConfiguration.getTimeout()));
+
+   acceptedSlotsFuture.whenCompleteAsync(
+   (Iterable acceptedSlots, 
Throwable throwable) -> {
+   if (throwable != null) {
+   if (throwable 
instanceof TimeoutException) {
+   log.info("Slot 
offering to JobManager did not finish in time. Retrying the slot offering.");
+   // We ran into 
a timeout. Try again.
+   
offerSlotsToJobManager(jobId);
+   } else {
+

flink git commit: [FLINK-7328] [futures] Replace Flink's futures with Java 8's CompletableFuture in SlotManager

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 6d4981a43 -> a6c8953eb


[FLINK-7328] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in SlotManager

This closes #4443.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a6c8953e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a6c8953e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a6c8953e

Branch: refs/heads/master
Commit: a6c8953eb3ee9d5e4fa760e474b040b19ed8b97f
Parents: 6d4981a
Author: Till Rohrmann 
Authored: Mon Jul 31 21:38:28 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 22:56:31 2017 +0200

--
 .../slotmanager/PendingSlotRequest.java |  3 +-
 .../slotmanager/SlotManager.java| 46 
 2 files changed, 21 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a6c8953e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
index ffe1bfc..17cf8c7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/PendingSlotRequest.java
@@ -21,13 +21,14 @@ package 
org.apache.flink.runtime.resourcemanager.slotmanager;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
 import org.apache.flink.util.Preconditions;
 
 import javax.annotation.Nullable;
 
+import java.util.concurrent.CompletableFuture;
+
 public class PendingSlotRequest {
 
private final SlotRequest slotRequest;

http://git-wip-us.apache.org/repos/asf/flink/blob/a6c8953e/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
index 829a06d..8354525 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java
@@ -24,11 +24,8 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.TaskManagerSlot;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.SlotRequest;
@@ -51,6 +48,7 @@ import java.util.Map;
 import java.util.Objects;
 import java.util.UUID;
 import java.util.concurrent.CancellationException;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
@@ -623,7 +621,7 @@ public class SlotManager implements AutoCloseable {
TaskExecutorConnection taskExecutorConnection = 
taskManagerSlot.getTaskManagerConnection();
TaskExecutorGateway gateway = 
taskExecutorConnection.getTaskExecutorGateway();
 
-   final CompletableFuture completableFuture = new 
FlinkCompletableFuture<>();
+   final CompletableFuture completableFuture = new 
CompletableFuture<>();
final AllocationID allocationId = 
pendingSlotRequest.getAllocationId();
final SlotID slotId = taskManagerSlot.getSlotId();
 
@@ -641,30 +639,26 @@ public class SlotManager implements AutoCloseable {
}
 

flink git commit: [FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture in SlotPool

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 3b97784ae -> 6d4981a43


[FLINK-7324] [futures] Replace Flink's future with Java 8's CompletableFuture 
in SlotPool

Address PR comments

This closes #4438.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d4981a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d4981a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d4981a4

Branch: refs/heads/master
Commit: 6d4981a431e1ad28dee3a2143477fa7d2696d5fd
Parents: 3b97784
Author: Till Rohrmann 
Authored: Mon Jul 31 19:35:14 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 22:52:11 2017 +0200

--
 .../flink/runtime/concurrent/FutureUtils.java   | 25 +++
 .../apache/flink/runtime/instance/SlotPool.java | 70 ++--
 2 files changed, 59 insertions(+), 36 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
index 9cdbe1f..8721e52 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
@@ -344,4 +344,29 @@ public class FutureUtils {
 
return result;
}
+
+   /**
+* Converts a Java 8 {@link java.util.concurrent.CompletableFuture} 
into a Flink {@link Future}.
+*
+* @param javaFuture to convert to a Flink future
+* @param  type of the future value
+* @return Flink future
+*
+* @deprecated Will be removed once we completely remove Flink's futures
+*/
+   @Deprecated
+   public static  Future 
toFlinkFuture(java.util.concurrent.CompletableFuture javaFuture) {
+   FlinkCompletableFuture result = new 
FlinkCompletableFuture<>();
+
+   javaFuture.whenComplete(
+   (value, throwable) -> {
+   if (throwable == null) {
+   result.complete(value);
+   } else {
+   result.completeExceptionally(throwable);
+   }
+   });
+
+   return result;
+   }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6d4981a4/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
index 8cf6a9b..c74d9a6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java
@@ -25,10 +25,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.jobmanager.scheduler.Locality;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
@@ -57,6 +55,7 @@ import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeoutException;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -246,7 +245,7 @@ public class SlotPool extends RpcEndpoint {
 
// work on all slots waiting for this connection
for (PendingRequest pending : 
waitingForResourceManager.values()) {
-   requestSlotFromResourceManager(pending.allocationID(), 
pending.future(), pending.resourceProfile());
+   requestSlotFromResourceManager(pending.allocationID(), 
pending.getFuture(), pending.resourceProfile());
}
 
// all sent off
@@ -269,7 +268,7 @@ public class SlotPool extends 

flink git commit: [FLINK-7321] [futures] Replace Flink's futures with Java 8's CompletableFuture in HeartbeatManager

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master cab490f89 -> 3b97784ae


[FLINK-7321] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in HeartbeatManager

Address PR comments

This closes #4434.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3b97784a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3b97784a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3b97784a

Branch: refs/heads/master
Commit: 3b97784aef34a6598d9947b259f73f935dc90c9f
Parents: cab490f
Author: Till Rohrmann 
Authored: Mon Jul 31 18:47:22 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 22:43:36 2017 +0200

--
 .../runtime/heartbeat/HeartbeatListener.java|  5 ++--
 .../runtime/heartbeat/HeartbeatManagerImpl.java | 24 +++-
 .../heartbeat/HeartbeatManagerSenderImpl.java   | 23 +++
 .../flink/runtime/jobmaster/JobMaster.java  |  8 +++
 .../resourcemanager/ResourceManager.java|  9 
 .../runtime/taskexecutor/TaskExecutor.java  | 21 -
 .../runtime/heartbeat/HeartbeatManagerTest.java | 24 +---
 7 files changed, 48 insertions(+), 66 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
index c6307aa..734eb4c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatListener.java
@@ -19,7 +19,8 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.Future;
+
+import java.util.concurrent.CompletableFuture;
 
 /**
  * Interface for the interaction with the {@link HeartbeatManager}. The 
heartbeat listener is used
@@ -58,5 +59,5 @@ public interface HeartbeatListener {
 *
 * @return Future containing the next payload for heartbeats
 */
-   Future retrievePayload();
+   CompletableFuture retrievePayload();
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/3b97784a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
index d97cfa0..99f44f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/heartbeat/HeartbeatManagerImpl.java
@@ -19,9 +19,6 @@
 package org.apache.flink.runtime.heartbeat;
 
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.ScheduledExecutor;
 import org.apache.flink.util.Preconditions;
 
@@ -30,6 +27,7 @@ import org.slf4j.Logger;
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.Collection;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
@@ -193,24 +191,18 @@ public class HeartbeatManagerImpl implements 
HeartbeatManager {

heartbeatListener.reportPayload(requestOrigin, heartbeatPayload);
}
 
-   Future futurePayload = 
heartbeatListener.retrievePayload();
+   CompletableFuture futurePayload = 
heartbeatListener.retrievePayload();
 
if (futurePayload != null) {
-   Future sendHeartbeatFuture = 
futurePayload.thenAcceptAsync(new AcceptFunction() {
-   @Override
-   public void accept(O 
retrievedPayload) {
-   
heartbeatTarget.receiveHeartbeat(getOwnResourceID(), retrievedPayload);
-   }
-   }, executor);
-
-   

flink git commit: [hotfix] Fix maven-javadoc-plugin configuration

2017-08-01 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master 123964ad0 -> 8cb726541


[hotfix] Fix maven-javadoc-plugin configuration

Because we're now using Java 8 we have to disable linting also when not
using the (now-removed) jdk8 profile or the release profile.

Because of this problem snapshot deployments were not working.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8cb72654
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8cb72654
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8cb72654

Branch: refs/heads/master
Commit: 8cb72654186baecf1e1c19522b13cc304847d1af
Parents: 123964a
Author: Aljoscha Krettek 
Authored: Tue Aug 1 17:29:08 2017 +0200
Committer: Aljoscha Krettek 
Committed: Tue Aug 1 17:29:11 2017 +0200

--
 pom.xml | 19 ---
 1 file changed, 12 insertions(+), 7 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8cb72654/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 132f73b..62f4e31 100644
--- a/pom.xml
+++ b/pom.xml
@@ -798,19 +798,12 @@ under the License.

org.apache.maven.plugins

maven-javadoc-plugin

2.9.1
-   
-   true
-   



attach-javadocs


jar

-   
-   
-Xdoclint:none
-   
false
-   



@@ -1240,6 +1233,18 @@ under the License.


 
+   
+   
org.apache.maven.plugins
+   
maven-javadoc-plugin
+   
2.9.1
+   
+   true
+   
-Xdoclint:none
+   
false
+   
+   
+
+



org.apache.maven.plugins



flink git commit: [hotfix] Fix snapshot deployment

2017-08-01 Thread aljoscha
Repository: flink
Updated Branches:
  refs/heads/master f41eb4b1e -> 123964ad0


[hotfix] Fix snapshot deployment

The snapshot deployment script still tried to use the jdk8 profile which
doesn't exist anymore.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/123964ad
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/123964ad
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/123964ad

Branch: refs/heads/master
Commit: 123964ad05e4997955ebb119690d2c9088f018dd
Parents: f41eb4b
Author: Aljoscha Krettek 
Authored: Tue Aug 1 16:29:28 2017 +0200
Committer: Aljoscha Krettek 
Committed: Tue Aug 1 16:30:20 2017 +0200

--
 tools/deploy_to_maven.sh | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/123964ad/tools/deploy_to_maven.sh
--
diff --git a/tools/deploy_to_maven.sh b/tools/deploy_to_maven.sh
index 32acc5a..2397bd3 100755
--- a/tools/deploy_to_maven.sh
+++ b/tools/deploy_to_maven.sh
@@ -84,7 +84,7 @@ echo "detected current version as: '$CURRENT_FLINK_VERSION'"
 #
 
 if [[ $CURRENT_FLINK_VERSION == *SNAPSHOT* ]] ; then
-MVN_SNAPSHOT_OPTS="-B -Pdocs-and-source,jdk8 -DskipTests -Drat.skip=true 
-Drat.ignoreErrors=true \
+MVN_SNAPSHOT_OPTS="-B -Pdocs-and-source -DskipTests -Drat.skip=true 
-Drat.ignoreErrors=true \
 -DretryFailedDeploymentCount=10 --settings deploysettings.xml clean 
deploy"
 
 # hadoop2 scala 2.10



Build failed in Jenkins: flink-snapshot-deployment #548

2017-08-01 Thread Apache Jenkins Server
See 


--
[...truncated 234.84 KB...]
[ERROR] public static void tryRethrowIOException(Throwable t) throws 
IOException {
[ERROR] ^
[ERROR] 
:262:
 warning: no @throws for java.io.IOException
[ERROR] public static void rethrowIOException(Throwable t) throws IOException {
[ERROR] ^
[ERROR] 
:342:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:343:
 warning: no description for @throws
[ERROR] * @throws ClassNotFoundException
[ERROR] ^
[ERROR] 
:363:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:364:
 warning: no description for @throws
[ERROR] * @throws ClassNotFoundException
[ERROR] ^
[ERROR] 
:248:
 warning: no @param for closeables
[ERROR] public static void closeAllQuietly(Iterable 
closeables) {
[ERROR] ^
[ERROR] 
:259:
 warning: no @param for closeable
[ERROR] public static void closeQuietly(AutoCloseable closeable) {
[ERROR] ^
[ERROR] 
:77:
 warning: no @param for hostPort
[ERROR] public static URL getCorrectHostnamePort(String hostPort) {
[ERROR] ^
[ERROR] 
:56:
 warning: no @param for 
[ERROR] public static  T checkNotNull(T reference) {
[ERROR] ^
[ERROR] 
:73:
 warning: no @param for 
[ERROR] public static  T checkNotNull(T reference, @Nullable String 
errorMessage) {
[ERROR] ^
[ERROR] 
:99:
 warning: no @param for 
[ERROR] public static  T checkNotNull(T reference,
[ERROR] ^
[ERROR] 
:37:
 warning: no description for @throws
[ERROR] * @throws IllegalArgumentException
[ERROR] ^
[ERROR] 
:61:
 warning: no description for @throws
[ERROR] * @throws IllegalArgumentException
[ERROR] ^
[ERROR] 
:85:
 warning: no description for @throws
[ERROR] * @throws IllegalArgumentException
[ERROR] ^
[ERROR] 
:87:
 warning: no @param for logger
[ERROR] public static long getLong(Properties config, String key, long 
defaultValue, Logger logger) {
[ERROR] ^
[ERROR] 
:39:
 warning: no @param for keyString
[ERROR] protected StringBasedID(String keyString) {
[ERROR] ^
[ERROR] 
:32:
 warning: no description for @param
[ERROR] * @param visitable
[ERROR] ^
[ERROR] 
:39:
 warning: no description for @param
[ERROR] * @param visitable
[ERROR] ^
[ERROR] 
:112:
 warning: no description for @throws
[ERROR] * @throws IOException
[ERROR] ^
[ERROR] 
:271:
 warning: no @throws for java.io.IOException
[ERROR] public static void 

flink git commit: [FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture in StreamRecordQueueEntry

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master fcac882d2 -> f41eb4b1e


[FLINK-7327] [futures] Replace Flink's future with Java 8's CompletableFuture 
in StreamRecordQueueEntry

This closes #4442.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f41eb4b1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f41eb4b1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f41eb4b1

Branch: refs/heads/master
Commit: f41eb4b1ea0112cce4b4edb4a25037fafa2aac23
Parents: fcac882
Author: Till Rohrmann 
Authored: Mon Jul 31 21:31:26 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 14:04:43 2017 +0200

--
 .../api/operators/async/AsyncWaitOperator.java  | 10 
 .../async/queue/OrderedStreamElementQueue.java  | 10 
 .../async/queue/StreamElementQueueEntry.java| 24 
 .../async/queue/StreamRecordQueueEntry.java |  8 +++
 .../queue/UnorderedStreamElementQueue.java  | 10 
 .../async/queue/WatermarkQueueEntry.java| 10 
 6 files changed, 29 insertions(+), 43 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
index 56c199d..a0f626e 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/AsyncWaitOperator.java
@@ -22,7 +22,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.streaming.api.datastream.AsyncDataStream;
@@ -217,12 +216,11 @@ public class AsyncWaitOperator
 
// Cancel the timer once we've completed the stream 
record buffer entry. This will remove
// the register trigger task
-   streamRecordBufferEntry.onComplete(new 
AcceptFunction>() {
-   @Override
-   public void 
accept(StreamElementQueueEntry value) {
+   streamRecordBufferEntry.onComplete(
+   (StreamElementQueueEntry 
value) -> {
timerFuture.cancel(true);
-   }
-   }, executor);
+   },
+   executor);
}
 
addAsyncBufferEntry(streamRecordBufferEntry);

http://git-wip-us.apache.org/repos/asf/flink/blob/f41eb4b1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
index e573fc1..5133809 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/async/queue/OrderedStreamElementQueue.java
@@ -19,7 +19,6 @@
 package org.apache.flink.streaming.api.operators.async.queue;
 
 import org.apache.flink.annotation.Internal;
-import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.streaming.api.operators.async.OperatorActions;
 import org.apache.flink.util.Preconditions;
 
@@ -193,9 +192,8 @@ public class OrderedStreamElementQueue implements 
StreamElementQueue {
 
queue.addLast(streamElementQueueEntry);
 
-   streamElementQueueEntry.onComplete(new 
AcceptFunction() {
-   @Override
-   public void accept(StreamElementQueueEntry value) {
+   streamElementQueueEntry.onComplete(
+   

flink git commit: [FLINK-7326] [futures] Replace Flink's future with Java 8's CompletableFuture in RegisteredRpcConnection

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 4378ac3ae -> fcac882d2


[FLINK-7326] [futures] Replace Flink's future with Java 8's CompletableFuture 
in RegisteredRpcConnection

Address PR comments

This closes #4440.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fcac882d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fcac882d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fcac882d

Branch: refs/heads/master
Commit: fcac882d243a5d2f0a5ed3ce54cba0e7263a112a
Parents: 4378ac3
Author: Till Rohrmann 
Authored: Mon Jul 31 20:11:30 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 14:00:34 2017 +0200

--
 .../flink/runtime/jobmaster/JobMaster.java  | 19 +++---
 .../registration/RegisteredRpcConnection.java   | 34 ---
 .../registration/RetryingRegistration.java  | 62 
 .../runtime/taskexecutor/JobLeaderService.java  |  9 +--
 ...TaskExecutorToResourceManagerConnection.java |  7 ++-
 .../registration/RetryingRegistrationTest.java  | 17 +++---
 6 files changed, 68 insertions(+), 80 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 947a914..9417f90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -38,6 +38,7 @@ import 
org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
@@ -106,6 +107,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeoutException;
@@ -1044,18 +1046,19 @@ public class JobMaster extends 
RpcEndpoint {
getTargetAddress(), getTargetLeaderId())
{
@Override
-   protected Future 
invokeRegistration(
+   protected 
CompletableFuture invokeRegistration(
ResourceManagerGateway gateway, 
UUID leaderId, long timeoutMillis) throws Exception
{
Time timeout = 
Time.milliseconds(timeoutMillis);
 
-   return gateway.registerJobManager(
-   leaderId,
-   jobManagerLeaderID,
-   jobManagerResourceID,
-   jobManagerRpcAddress,
-   jobID,
-   timeout);
+   return FutureUtils.toJava(
+   gateway.registerJobManager(
+   leaderId,
+   jobManagerLeaderID,
+   jobManagerResourceID,
+   jobManagerRpcAddress,
+   jobID,
+   timeout));
}
};
}

http://git-wip-us.apache.org/repos/asf/flink/blob/fcac882d/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index b477546..da46e1c 100644
--- 

flink git commit: [FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture in CheckpointCoordinator

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 4a9f19b9f -> 4378ac3ae


[FLINK-7322] [futures] Replace Flink's futures with Java 8's CompletableFuture 
in CheckpointCoordinator

Fix failing JobManagerITCase

This closes #4436.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4378ac3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4378ac3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4378ac3a

Branch: refs/heads/master
Commit: 4378ac3ae36f12c8678d2090f7c344832d6d0761
Parents: 4a9f19b
Author: Till Rohrmann 
Authored: Mon Jul 31 19:05:22 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 13:54:50 2017 +0200

--
 .../checkpoint/CheckpointCoordinator.java   | 41 ++--
 .../runtime/checkpoint/PendingCheckpoint.java   |  9 ++---
 .../flink/runtime/jobmanager/JobManager.scala   | 11 +++---
 .../checkpoint/CheckpointCoordinatorTest.java   | 20 +-
 .../checkpoint/PendingCheckpointTest.java   |  4 +-
 .../runtime/jobmanager/JobManagerITCase.scala   |  7 ++--
 .../testingUtils/TestingJobManagerLike.scala|  3 +-
 7 files changed, 48 insertions(+), 47 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4378ac3a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index 3e36158..5cab7f8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -26,9 +26,6 @@ import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.runtime.checkpoint.hooks.MasterHooks;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointLoader;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointStore;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -58,6 +55,7 @@ import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ScheduledThreadPoolExecutor;
@@ -362,7 +360,7 @@ public class CheckpointCoordinator {
 *   configured
 * @throws Exception Failures during triggering are 
forwarded
 */
-   public Future triggerSavepoint(long timestamp, 
String targetDirectory) throws Exception {
+   public CompletableFuture triggerSavepoint(long 
timestamp, String targetDirectory) throws Exception {
checkNotNull(targetDirectory, "Savepoint target directory");
 
CheckpointProperties props = 
CheckpointProperties.forStandardSavepoint();
@@ -377,29 +375,30 @@ public class CheckpointCoordinator {
savepointDirectory,
false);
 
-   Future result;
+   CompletableFuture result;
 
if (triggerResult.isSuccess()) {
result = 
triggerResult.getPendingCheckpoint().getCompletionFuture();
} else {
Throwable cause = new Exception("Failed to trigger 
savepoint: " + triggerResult.getFailureReason().message());
-   result = 
FlinkCompletableFuture.completedExceptionally(cause);
+   result = new CompletableFuture<>();
+   result.completeExceptionally(cause);
+   return result;
}
 
// Make sure to remove the created base directory on Exceptions
-   result.exceptionallyAsync(new ApplyFunction() {
-   @Override
-   public Void apply(Throwable value) {
-   try {
-   
SavepointStore.deleteSavepointDirectory(savepointDirectory);
-   } catch (Throwable t) {
-   LOG.warn("Failed to delete savepoint 
directory " + savepointDirectory
-   + " after failed 

flink git commit: [FLINK-7319] [futures] Replace Flink's Futures with Java 8 CompletableFuture in MesosResourceManager

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 7c639c600 -> 4a9f19b9f


[FLINK-7319] [futures] Replace Flink's Futures with Java 8 CompletableFuture in 
MesosResourceManager

This closes #4432.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4a9f19b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4a9f19b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4a9f19b9

Branch: refs/heads/master
Commit: 4a9f19b9faa87de52ac1f078e53f26fd32efadad
Parents: 7c639c6
Author: Till Rohrmann 
Authored: Mon Jul 31 18:06:20 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 13:45:15 2017 +0200

--
 .../clusterframework/MesosResourceManager.java  | 44 +---
 1 file changed, 19 insertions(+), 25 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4a9f19b9/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
--
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
index 260d5bf..736af59 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
@@ -45,10 +45,7 @@ import 
org.apache.flink.runtime.clusterframework.ContainerSpecification;
 import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
-import org.apache.flink.runtime.concurrent.ApplyFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.FutureUtils;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import org.apache.flink.runtime.instance.InstanceID;
@@ -79,11 +76,11 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
 
 import scala.Option;
@@ -324,24 +321,23 @@ public class MesosResourceManager extends 
ResourceManager stopTaskMonitorFuture = stopActor(taskMonitor, 
stopTimeout);
+   CompletableFuture stopTaskMonitorFuture = 
stopActor(taskMonitor, stopTimeout);
taskMonitor = null;
 
-   Future stopConnectionMonitorFuture = 
stopActor(connectionMonitor, stopTimeout);
+   CompletableFuture stopConnectionMonitorFuture = 
stopActor(connectionMonitor, stopTimeout);
connectionMonitor = null;
 
-   Future stopLaunchCoordinatorFuture = 
stopActor(launchCoordinator, stopTimeout);
+   CompletableFuture stopLaunchCoordinatorFuture = 
stopActor(launchCoordinator, stopTimeout);
launchCoordinator = null;
 
-   Future stopReconciliationCoordinatorFuture = 
stopActor(reconciliationCoordinator, stopTimeout);
+   CompletableFuture stopReconciliationCoordinatorFuture 
= stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
 
-   Future stopFuture = FutureUtils.waitForAll(
-   Arrays.asList(
-   stopTaskMonitorFuture,
-   stopConnectionMonitorFuture,
-   stopLaunchCoordinatorFuture,
-   stopReconciliationCoordinatorFuture));
+   CompletableFuture stopFuture = CompletableFuture.allOf(
+   stopTaskMonitorFuture,
+   stopConnectionMonitorFuture,
+   stopLaunchCoordinatorFuture,
+   stopReconciliationCoordinatorFuture);
 
// wait for the future to complete or to time out
try {
@@ -606,20 +602,18 @@ public class MesosResourceManager extends 
ResourceManager stopActor(final ActorRef actorRef, 
FiniteDuration timeout) {
-   return new FlinkFuture<>(Patterns.gracefulStop(actorRef, 
timeout))
+   private CompletableFuture stopActor(final ActorRef actorRef, 
FiniteDuration timeout) {
+   return FutureUtils.toJava(Patterns.gracefulStop(actorRef, 
timeout))
 

flink git commit: [FLINK-7318] [futures] Replace Flink's futures in StackTraceSampleCoordinator with Java 8 CompletableFuture

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 748448b5f -> 7c639c600


[FLINK-7318] [futures] Replace Flink's futures in StackTraceSampleCoordinator 
with Java 8 CompletableFuture

This closes #4431.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7c639c60
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7c639c60
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7c639c60

Branch: refs/heads/master
Commit: 7c639c600eb08923ec9dc0caaf337d70e2ac1719
Parents: 748448b
Author: Till Rohrmann 
Authored: Mon Jul 31 17:55:06 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 13:38:37 2017 +0200

--
 .../webmonitor/BackPressureStatsTracker.java|  6 +--
 .../webmonitor/StackTraceSampleCoordinator.java | 45 ++--
 .../BackPressureStatsTrackerTest.java   |  8 ++--
 .../StackTraceSampleCoordinatorITCase.java  |  7 +--
 .../StackTraceSampleCoordinatorTest.java| 32 +++---
 5 files changed, 50 insertions(+), 48 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
index 894309c..26be769 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/BackPressureStatsTracker.java
@@ -19,8 +19,6 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -38,8 +36,10 @@ import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeUnit;
+import java.util.function.BiFunction;
 
 import scala.Option;
 
@@ -177,7 +177,7 @@ public class BackPressureStatsTracker {
LOG.debug("Triggering stack 
trace sample for tasks: " + Arrays.toString(vertex.getTaskVertices()));
}
 
-   Future sample = 
coordinator.triggerStackTraceSample(
+   CompletableFuture 
sample = coordinator.triggerStackTraceSample(

vertex.getTaskVertices(),
numSamples,
delayBetweenSamples,

http://git-wip-us.apache.org/repos/asf/flink/blob/7c639c60/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
--
diff --git 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
index 5a85343..3521f58 100644
--- 
a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
+++ 
b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/StackTraceSampleCoordinator.java
@@ -19,10 +19,7 @@
 package org.apache.flink.runtime.webmonitor;
 
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.runtime.concurrent.BiFunction;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
@@ -42,6 +39,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -105,7 +103,7 @@ public class StackTraceSampleCoordinator {
 * @return A 

flink git commit: [FLINK-7313] [futures] Add Flink future and Scala future to Java 8 CompletableFuture conversion

2017-08-01 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 3c42557f3 -> 648c1f595


[FLINK-7313] [futures] Add Flink future and Scala future to Java 8 
CompletableFuture conversion

Add DirectExecutionContext

Add Scala Future to Java 8 CompletableFuture utility to FutureUtils

Add Flink future to Java 8's CompletableFuture conversion utility to FutureUtils

Add base class for Flink's unchecked future exceptions

This closes #4429.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/648c1f59
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/648c1f59
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/648c1f59

Branch: refs/heads/master
Commit: 648c1f595990655f9c866d0c9e8983e0b63a927a
Parents: 3c42557
Author: Till Rohrmann 
Authored: Mon Jul 31 15:07:18 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Aug 1 13:30:25 2017 +0200

--
 .../flink/runtime/concurrent/Executors.java | 37 +
 .../concurrent/FlinkFutureException.java| 47 
 .../flink/runtime/concurrent/FutureUtils.java   | 58 
 3 files changed, 142 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
index e8a9be9..04cdce7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -26,6 +26,8 @@ import java.util.concurrent.Executor;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.ExecutionContext;
+
 /**
  * Collection of {@link Executor} implementations
  */
@@ -59,6 +61,41 @@ public class Executors {
}
 
/**
+* Return a direct execution context. The direct execution context 
executes the runnable directly
+* in the calling thread.
+*
+* @return Direct execution context.
+*/
+   public static ExecutionContext directExecutionContext() {
+   return DirectExecutionContext.INSTANCE;
+   }
+
+   /**
+* Direct execution context.
+*/
+   private static class DirectExecutionContext implements ExecutionContext 
{
+
+   static final DirectExecutionContext INSTANCE = new 
DirectExecutionContext();
+
+   private DirectExecutionContext() {}
+
+   @Override
+   public void execute(Runnable runnable) {
+   runnable.run();
+   }
+
+   @Override
+   public void reportFailure(Throwable cause) {
+   throw new IllegalStateException("Error in direct 
execution context.", cause);
+   }
+
+   @Override
+   public ExecutionContext prepare() {
+   return this;
+   }
+   }
+
+   /**
 * Gracefully shutdown the given {@link ExecutorService}. The call 
waits the given timeout that
 * all ExecutorServices terminate. If the ExecutorServices do not 
terminate in this time,
 * they will be shut down hard.

http://git-wip-us.apache.org/repos/asf/flink/blob/648c1f59/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
new file mode 100644
index 000..c728904
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FlinkFutureException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific