flink git commit: [FLINK-4671] [table] Table API can not be built

2016-09-27 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master ef1598498 -> e5d62da2c


[FLINK-4671] [table] Table API can not be built

This closes #2549.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e5d62da2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e5d62da2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e5d62da2

Branch: refs/heads/master
Commit: e5d62da2c98ad9c6a5ca9c0782a7fea8a01d639a
Parents: ef15984
Author: twalthr 
Authored: Mon Sep 26 15:05:40 2016 +0200
Committer: twalthr 
Committed: Tue Sep 27 10:26:40 2016 +0200

--
 flink-test-utils-parent/flink-test-utils/pom.xml | 19 +--
 pom.xml  | 10 ++
 2 files changed, 11 insertions(+), 18 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e5d62da2/flink-test-utils-parent/flink-test-utils/pom.xml
--
diff --git a/flink-test-utils-parent/flink-test-utils/pom.xml 
b/flink-test-utils-parent/flink-test-utils/pom.xml
index 5c99ef6..875a2bf 100644
--- a/flink-test-utils-parent/flink-test-utils/pom.xml
+++ b/flink-test-utils-parent/flink-test-utils/pom.xml
@@ -82,26 +82,9 @@ under the License.
org.apache.hadoop
hadoop-minikdc
${minikdc.version}
+   compile

 

 
-   
-   
-
-   
-   
-   org.apache.felix
-   maven-bundle-plugin
-   3.0.1
-   true
-   true
-   
-
-   
-   
-
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e5d62da2/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 5b3148a..7e517e9 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1081,6 +1081,16 @@ under the License.


 
+   
+   
+   org.apache.felix
+   maven-bundle-plugin
+   3.0.1
+   true
+   true
+   
+

 
 



flink git commit: [FLINK-4687] [rpc] Add getAddress to RpcService

2016-09-27 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/flip-6 2a61e74b9 -> 93775cef6


[FLINK-4687] [rpc] Add getAddress to RpcService

This closes #2551.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/93775cef
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/93775cef
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/93775cef

Branch: refs/heads/flip-6
Commit: 93775cef67b1903a8f462da924f97df5ae6819c4
Parents: 2a61e74
Author: Till Rohrmann 
Authored: Mon Sep 26 18:01:47 2016 +0200
Committer: Till Rohrmann 
Committed: Tue Sep 27 13:45:50 2016 +0200

--
 .../org/apache/flink/runtime/rpc/RpcService.java|  8 
 .../flink/runtime/rpc/akka/AkkaRpcService.java  | 16 
 .../apache/flink/runtime/rpc/TestingRpcService.java |  5 +++--
 .../flink/runtime/rpc/TestingSerialRpcService.java  |  6 ++
 .../flink/runtime/rpc/akka/AkkaRpcActorTest.java|  8 
 .../flink/runtime/rpc/akka/AkkaRpcServiceTest.java  |  5 +
 6 files changed, 42 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/93775cef/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
index 437e08b..96844ed 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/RpcService.java
@@ -33,6 +33,14 @@ import java.util.concurrent.TimeUnit;
 public interface RpcService {
 
/**
+* Return the address under which the rpc service can be reached. If 
the rpc service cannot be
+* contacted remotely, then it will return an empty string.
+*
+* @return Address of the rpc service or empty string if local rpc 
service
+*/
+   String getAddress();
+
+   /**
 * Connect to a remote rpc server under the provided address. Returns a 
rpc gateway which can
 * be used to communicate with the rpc server. If the connection 
failed, then the returned
 * future is failed with a {@link RpcConnectionException}.

http://git-wip-us.apache.org/repos/asf/flink/blob/93775cef/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
index cee19c4..6825557 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rpc/akka/AkkaRpcService.java
@@ -22,6 +22,7 @@ import akka.actor.ActorIdentity;
 import akka.actor.ActorRef;
 import akka.actor.ActorSelection;
 import akka.actor.ActorSystem;
+import akka.actor.Address;
 import akka.actor.Identify;
 import akka.actor.PoisonPill;
 import akka.actor.Props;
@@ -75,6 +76,8 @@ public class AkkaRpcService implements RpcService {
private final Set actors = new HashSet<>(4);
private final long maximumFramesize;
 
+   private final String address;
+
private volatile boolean stopped;
 
public AkkaRpcService(final ActorSystem actorSystem, final Time 
timeout) {
@@ -87,6 +90,19 @@ public class AkkaRpcService implements RpcService {
// only local communication
maximumFramesize = Long.MAX_VALUE;
}
+
+   Address actorSystemAddress = AkkaUtils.getAddress(actorSystem);
+
+   if (actorSystemAddress.host().isDefined()) {
+   address = actorSystemAddress.host().get();
+   } else {
+   address = "";
+   }
+   }
+
+   @Override
+   public String getAddress() {
+   return address;
}
 
// this method does not mutate state and is thus thread-safe

http://git-wip-us.apache.org/repos/asf/flink/blob/93775cef/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f164056..47c9e24 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.c

flink git commit: [FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be reusable

2016-09-27 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/flip-6 93775cef6 -> ed5c83dc2


[FLINK-4530] [rpc] Generalize TaskExecutorToResourceManagerConnection to be 
reusable

This closes #2520


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ed5c83dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ed5c83dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ed5c83dc

Branch: refs/heads/flip-6
Commit: ed5c83dc2c2a9d46f293b0de01342829e2e598a5
Parents: 93775ce
Author: zhuhaifengleon 
Authored: Mon Sep 26 17:43:44 2016 +0800
Committer: Stephan Ewen 
Committed: Tue Sep 27 16:55:22 2016 +0200

--
 .../JobMasterToResourceManagerConnection.java   | 117 +++
 .../registration/RegisteredRpcConnection.java   | 192 +++
 .../runtime/taskexecutor/TaskExecutor.java  |   4 +-
 ...TaskExecutorToResourceManagerConnection.java | 127 +++-
 .../RegisteredRpcConnectionTest.java| 183 ++
 .../registration/RetryingRegistrationTest.java  |   6 +-
 6 files changed, 519 insertions(+), 110 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ed5c83dc/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
new file mode 100644
index 000..71fce8c
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterToResourceManagerConnection.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmaster;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.registration.RegisteredRpcConnection;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.registration.RetryingRegistration;
+import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.runtime.concurrent.Future;
+
+import org.slf4j.Logger;
+
+import java.util.UUID;
+import java.util.concurrent.Executor;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * The connection between a JobMaster and the ResourceManager.
+ */
+public class JobMasterToResourceManagerConnection 
+   extends RegisteredRpcConnection {
+
+   /** the JobMaster whose connection to the ResourceManager this 
represents */
+   private final JobMaster jobMaster;
+
+   private final JobID jobID;
+
+   private final UUID jobMasterLeaderId;
+
+   public JobMasterToResourceManagerConnection(
+   Logger log,
+   JobID jobID,
+   JobMaster jobMaster,
+   UUID jobMasterLeaderId,
+   String resourceManagerAddress,
+   UUID resourceManagerLeaderId,
+   Executor executor) {
+
+   super(log, resourceManagerAddress, resourceManagerLeaderId, 
executor);
+   this.jobMaster = checkNotNull(jobMaster);
+   this.jobID = checkNotNull(jobID);
+   this.jobMasterLeaderId = checkNotNull(jobMasterLeaderId);
+   }
+
+   @Override
+   protected RetryingRegistration generateRegistration() {
+   return new 
JobMasterToResourceManagerConnection.ResourceManagerRegistration(
+   log, jobMaster.getRpcService(),
+   getTargetAddress(), getTargetLeaderId(),
+   jobMaster.getAddress(),jobID, jobMasterLeaderId);
+   }
+
+   @Override
+   protected void onRegistrationSuccess(JobMasterRegistrationSuccess 
success) {
+   }
+
+   @Override
+

[1/3] flink git commit: [hotfix] [tests] Speed up streaming state tests by skipping default retry delay.

2016-09-27 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master e5d62da2c -> 6e123d287


[hotfix] [tests] Speed up streaming state tests by skipping default retry delay.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e123d28
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e123d28
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e123d28

Branch: refs/heads/master
Commit: 6e123d287443430bf1721952c5692069e41d95cc
Parents: b1642e3
Author: Stephan Ewen 
Authored: Tue Sep 27 14:53:57 2016 +0200
Committer: Stephan Ewen 
Committed: Tue Sep 27 14:58:41 2016 +0200

--
 .../flink/test/checkpointing/StreamFaultToleranceTestBase.java   | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6e123d28/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
index 5f6cd4a..10f78d4 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamFaultToleranceTestBase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.test.checkpointing;
 
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-
 import org.apache.flink.test.util.TestUtils;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -98,6 +99,7 @@ public abstract class StreamFaultToleranceTestBase extends 
TestLogger {
env.setParallelism(PARALLELISM);
env.enableCheckpointing(500);
env.getConfig().disableSysoutLogging();
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
0L));
 
testProgram(env);
 



[3/3] flink git commit: [FLINK-4696] [core] Limit number of Akka threads in local minicluster setups

2016-09-27 Thread sewen
[FLINK-4696] [core] Limit number of Akka threads in local minicluster setups

Since Flink uses a rather small number of actors, not too many actor dispatcher 
threads are needed.
To prevent mini cluster setups on multi-core CPUs (32 or 64 cores) to spawn too 
many threads,
this limits the number of dispatcher threads for mini clusters.

For proper Flink deployments, the threads are not limited by this change.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ea9284d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ea9284d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ea9284d

Branch: refs/heads/master
Commit: 6ea9284d29ec79576f073441a5de681019720ab0
Parents: e5d62da
Author: Stephan Ewen 
Authored: Tue Sep 27 14:21:20 2016 +0200
Committer: Stephan Ewen 
Committed: Tue Sep 27 14:58:41 2016 +0200

--
 .../apache/flink/runtime/akka/AkkaUtils.scala| 19 +++
 .../runtime/minicluster/FlinkMiniCluster.scala   | 10 ++
 2 files changed, 25 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9284d/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
index 7aa75c0..bd3af33 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala
@@ -189,6 +189,25 @@ object AkkaUtils {
 ConfigFactory.parseString(config)
   }
 
+  def testDispatcherConfig: Config = {
+val config =
+  s"""
+ |akka {
+ |  actor {
+ |default-dispatcher {
+ |  fork-join-executor {
+ |parallelism-factor = 1.0
+ |parallelism-min = 1
+ |parallelism-max = 4
+ |  }
+ |}
+ |  }
+ |}
+  """.stripMargin
+
+ConfigFactory.parseString(config)
+  }
+
   /**
* Creates a Akka config for a remote actor system listening on port on the 
network interface
* identified by hostname.

http://git-wip-us.apache.org/repos/asf/flink/blob/6ea9284d/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
--
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
index 0178bd3..a263f66 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala
@@ -40,7 +40,6 @@ import org.slf4j.LoggerFactory
 
 import scala.concurrent.duration.{Duration, FiniteDuration}
 import scala.concurrent._
-import scala.concurrent.forkjoin.ForkJoinPool
 
 /**
  * Abstract base class for Flink's mini cluster. The mini cluster starts a
@@ -266,17 +265,20 @@ abstract class FlinkMiniCluster(
 
   def startResourceManagerActorSystem(index: Int): ActorSystem = {
 val config = getResourceManagerAkkaConfig(index)
-AkkaUtils.createActorSystem(config)
+val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+AkkaUtils.createActorSystem(testConfig)
   }
 
   def startJobManagerActorSystem(index: Int): ActorSystem = {
 val config = getJobManagerAkkaConfig(index)
-AkkaUtils.createActorSystem(config)
+val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+AkkaUtils.createActorSystem(testConfig)
   }
 
   def startTaskManagerActorSystem(index: Int): ActorSystem = {
 val config = getTaskManagerAkkaConfig(index)
-AkkaUtils.createActorSystem(config)
+val testConfig = AkkaUtils.testDispatcherConfig.withFallback(config)
+AkkaUtils.createActorSystem(testConfig)
   }
 
   def startJobClientActorSystem(jobID: JobID): ActorSystem = {



[2/3] flink git commit: [FLINK-4685] [checkpoints] Gather sync/async duration and alignment information for task checkpoints

2016-09-27 Thread sewen
[FLINK-4685] [checkpoints] Gather sync/async duration and alignment information 
for task checkpoints

This adds to each 'acknowledge checkpoint' message
  - number of bytes buffered during alignment
  - duration of alignment phase
  - duration of synchronous part of the operator checkpoint
  - duration of asynchronous part of the operator checkpoint


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1642e32
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1642e32
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1642e32

Branch: refs/heads/master
Commit: b1642e32c2f69c60c2b212260c3479feb66a9165
Parents: 6ea9284
Author: Stephan Ewen 
Authored: Mon Sep 26 14:10:21 2016 +0200
Committer: Stephan Ewen 
Committed: Tue Sep 27 14:58:41 2016 +0200

--
 .../state/RocksDBAsyncSnapshotTest.java | 19 ++--
 .../flink/runtime/execution/Environment.java| 43 +++--
 .../runtime/jobgraph/tasks/StatefulTask.java| 27 --
 .../checkpoint/AcknowledgeCheckpoint.java   | 93 +---
 .../ActorGatewayCheckpointResponder.java| 23 ++---
 .../taskmanager/CheckpointResponder.java| 29 --
 .../runtime/taskmanager/RuntimeEnvironment.java | 27 --
 .../jobmanager/JobManagerHARecoveryTest.java| 11 ++-
 .../operators/testutils/DummyEnvironment.java   | 14 +--
 .../operators/testutils/MockEnvironment.java| 15 ++--
 .../runtime/taskmanager/TaskAsyncCallTest.java  |  6 ++
 .../streaming/runtime/io/BarrierBuffer.java | 83 +++--
 .../streaming/runtime/io/BarrierTracker.java| 44 +
 .../streaming/runtime/io/BufferSpiller.java | 36 +---
 .../runtime/io/CheckpointBarrierHandler.java| 30 +--
 .../runtime/io/StreamInputProcessor.java| 24 +++--
 .../runtime/io/StreamTwoInputProcessor.java | 16 ++--
 .../runtime/tasks/OneInputStreamTask.java   |  2 +-
 .../streaming/runtime/tasks/StreamTask.java | 87 --
 .../runtime/tasks/TwoInputStreamTask.java   |  2 +-
 .../streaming/runtime/io/BarrierBufferTest.java | 88 ++
 .../runtime/io/BarrierTrackerTest.java  | 40 +++--
 .../runtime/tasks/OneInputStreamTaskTest.java   | 21 +++--
 .../runtime/tasks/StreamMockEnvironment.java| 14 +--
 24 files changed, 576 insertions(+), 218 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b1642e32/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index d5b9b46..c0c9ca1 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -136,15 +136,16 @@ public class RocksDBAsyncSnapshotTest {
testHarness.bufferSize) {
 
@Override
-   public void acknowledgeCheckpoint(long checkpointId) {
-   super.acknowledgeCheckpoint(checkpointId);
-   }
-
-   @Override
-   public void acknowledgeCheckpoint(long checkpointId,
- 
ChainedStateHandle chainedStateHandle,
- 
List keyGroupStateHandles) {
-   super.acknowledgeCheckpoint(checkpointId, 
chainedStateHandle, keyGroupStateHandles);
+   public void acknowledgeCheckpoint(
+   long checkpointId,
+   ChainedStateHandle 
chainedStateHandle, 
+   List 
keyGroupStateHandles,
+   long synchronousDurationMillis, long 
asynchronousDurationMillis,
+   long bytesBufferedInAlignment, long 
alignmentDurationNanos) {
+
+   super.acknowledgeCheckpoint(checkpointId, 
chainedStateHandle, keyGroupStateHandles,
+   synchronousDurationMillis, 
asynchronousDurationMillis,
+   bytesBufferedInAlignment, 
alignmentDurationNanos);
 
// block on the latch, to verify that 
triggerCheckpoint ret

[2/3] flink git commit: [FLINK-4690] Replace SlotAllocationFuture with flink's own future

2016-09-27 Thread trohrmann
[FLINK-4690] Replace SlotAllocationFuture with flink's own future

This closes #2552.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b88f1a7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b88f1a7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b88f1a7

Branch: refs/heads/master
Commit: 7b88f1a75ea92f6b26624a7358e7fcafa3e9506f
Parents: f8138f4
Author: Kurt Young 
Authored: Tue Sep 27 12:10:08 2016 +0800
Committer: Till Rohrmann 
Committed: Tue Sep 27 18:39:36 2016 +0200

--
 .../runtime/concurrent/impl/FlinkFuture.java|   1 -
 .../flink/runtime/executiongraph/Execution.java |  55 ++---
 .../flink/runtime/instance/SlotProvider.java|   6 +-
 .../runtime/jobmanager/scheduler/Scheduler.java |  24 ++-
 .../scheduler/SlotAllocationFuture.java | 146 --
 .../scheduler/SlotAllocationFutureAction.java   |  34 
 .../ExecutionGraphMetricsTest.java  |   9 +-
 .../ExecutionVertexSchedulingTest.java  |  19 +-
 .../scheduler/SchedulerIsolatedTasksTest.java   |  31 ++-
 .../scheduler/SlotAllocationFutureTest.java | 200 ---
 10 files changed, 80 insertions(+), 445 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
index 361cd3d..3f2c5e4 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/impl/FlinkFuture.java
@@ -104,7 +104,6 @@ public class FlinkFuture implements Future {
@Override
public T getNow(T valueIfAbsent) throws ExecutionException {
Preconditions.checkNotNull(scalaFuture);
-   Preconditions.checkNotNull(valueIfAbsent);
 
Option> value = scalaFuture.value();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7b88f1a7/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 6826365..8c02e1b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -20,18 +20,18 @@ package org.apache.flink.runtime.executiongraph;
 
 import akka.dispatch.OnComplete;
 import akka.dispatch.OnFailure;
-
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionLocation;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SlotProvider;
@@ -41,20 +41,18 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobmanager.scheduler.CoLocationConstraint;
 import 
org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException;
 import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
-import org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFuture;
-import 
org.apache.flink.runtime.jobmanager.scheduler.SlotAllocationFutureAction;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
 import org.apache.flink.runtime.state.ChainedStateHandle;
 import org.apache.flink.runtime.state.KeyGroupsStateHandle;
 import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.taskmanager.Tas

[3/3] flink git commit: [FLINK-4361] Introduce Flink's own future abstraction

2016-09-27 Thread trohrmann
[FLINK-4361] Introduce Flink's own future abstraction

Flink's future abstraction whose API is similar to Java 8's CompletableFuture.
That's in order to ease a future transition to this class once we ditch Java 7.
The current set of operations comprises:

- isDone to check the completion of the future
- get/getNow to obtain the future's value
- cancel to cancel the future (best effort basis)
- thenApplyAsync to transform the future's value into another value
- thenAcceptAsync to register a callback for a successful completion of the 
future
- exceptionallyAsync to register a callback for an exception completion of the 
future
- thenComposeAsync to transform the future's value and flatten the returned 
future
- handleAsync to register a callback which is called either with the regular 
result
or the exceptional result

Additionally, Flink offers a CompletableFuture which can be completed with a 
regular
value or an exception:

- complete/completeExceptionally

Complete FlinkCompletableFuture exceptionally with a CanellationException upon 
cancel

Add convenience functions for FlinkCompletableFutures

This closes #2554.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f8138f4b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f8138f4b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f8138f4b

Branch: refs/heads/master
Commit: f8138f4b74332ecb4ef0d28a09e8549708118ca6
Parents: 6e123d2
Author: Till Rohrmann 
Authored: Fri Sep 2 21:13:34 2016 +0200
Committer: Till Rohrmann 
Committed: Tue Sep 27 18:39:36 2016 +0200

--
 .../runtime/concurrent/AcceptFunction.java  |  34 +++
 .../flink/runtime/concurrent/ApplyFunction.java |  36 +++
 .../flink/runtime/concurrent/BiFunction.java|  38 +++
 .../runtime/concurrent/CompletableFuture.java   |  47 
 .../apache/flink/runtime/concurrent/Future.java | 156 +++
 .../concurrent/impl/FlinkCompletableFuture.java |  91 +++
 .../runtime/concurrent/impl/FlinkFuture.java| 273 +++
 .../runtime/concurrent/FlinkFutureTest.java | 269 ++
 8 files changed, 944 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
new file mode 100644
index 000..a300647
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/AcceptFunction.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+/**
+ * Function which is called with a single argument and does not return a value.
+ *
+ * @param  type of the argument
+ */
+public interface AcceptFunction {
+
+   /**
+* Method which handles the function call.
+*
+* @param value is the function's argument
+*/
+   void accept(T value);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f8138f4b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
new file mode 100644
index 000..64def98
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/ApplyFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this f

[1/3] flink git commit: [FLINK-4690] Use direct executor to run slot allocation future handler

2016-09-27 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 6e123d287 -> 84672c22f


[FLINK-4690] Use direct executor to run slot allocation future handler


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/84672c22
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/84672c22
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/84672c22

Branch: refs/heads/master
Commit: 84672c22f8088a70caf35b54d74eee458bf600dd
Parents: 7b88f1a
Author: Till Rohrmann 
Authored: Tue Sep 27 15:33:07 2016 +0200
Committer: Till Rohrmann 
Committed: Tue Sep 27 18:39:36 2016 +0200

--
 .../flink/runtime/concurrent/Executors.java | 52 +
 .../flink/runtime/executiongraph/Execution.java | 61 
 .../runtime/jobmanager/scheduler/Scheduler.java | 15 +++--
 3 files changed, 84 insertions(+), 44 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
new file mode 100644
index 000..1832d70
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/Executors.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.concurrent;
+
+import java.util.concurrent.Executor;
+
+/**
+ * Collection of {@link Executor} implementations
+ */
+public class Executors {
+
+   /**
+* Return a direct executor. The direct executor directly executes the 
runnable in the calling
+* thread.
+*
+* @return Direct executor
+*/
+   public static Executor directExecutor() {
+   return DirectExecutor.INSTANCE;
+   }
+
+   /**
+* Direct executor implementation.
+*/
+   private static class DirectExecutor implements Executor {
+
+   static final DirectExecutor INSTANCE = new DirectExecutor();
+
+   private DirectExecutor() {}
+
+   @Override
+   public void execute(Runnable command) {
+   command.run();
+   }
+   }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/84672c22/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 8c02e1b..912ff10 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -26,6 +26,7 @@ import 
org.apache.flink.runtime.accumulators.AccumulatorRegistry;
 import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.concurrent.BiFunction;
+import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
 import 
org.apache.flink.runtime.deployment.PartialInputChannelDeploymentDescriptor;
@@ -52,7 +53,6 @@ import org.apache.flink.util.ExceptionUtils;
 import org.slf4j.Logger;
 
 import scala.concurrent.ExecutionContext;
-import scala.concurrent.ExecutionContext$;
 import scala.concurrent.duration.FiniteDuration;
 
 import java.util.ArrayList;
@@ -297,49 +297,38 @@ public class Execution {
 
// IMPORTANT: To prevent leaks of cluster resources, we 
need to make sure that slots are returned
// in all cases where the deployment failed. we use 
many try {} finall

flink git commit: [FLINK-4695] Introduce MetricRegistryConfiguration to encapsulate MetricRegistry parameters

2016-09-27 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 84672c22f -> f1b5b35f5


[FLINK-4695] Introduce MetricRegistryConfiguration to encapsulate 
MetricRegistry parameters

In order to decouple the MetricRegistry object instantiation from the global 
configuration
the MetricRegistryConfiguration class has been introduced. This class 
encapsulates all
required information to create a MetricRegistry object. Furthermore, it 
encapsulates the
configuration parsing logic by offering a static method fromConfiguration, 
which constructs
a MetricRegistryConfiguration object from a Configuration.

This closes #2555.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f1b5b35f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f1b5b35f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f1b5b35f

Branch: refs/heads/master
Commit: f1b5b35f595e7ae53001a4c46edbf0c9b78ee376
Parents: 84672c2
Author: Till Rohrmann 
Authored: Mon Sep 26 16:49:23 2016 +0200
Committer: Till Rohrmann 
Committed: Tue Sep 27 18:55:30 2016 +0200

--
 .../ScheduledDropwizardReporterTest.java|   5 +-
 .../DropwizardFlinkHistogramWrapperTest.java|   6 +-
 .../flink/metrics/jmx/JMXReporterTest.java  |   9 +-
 .../metrics/statsd/StatsDReporterTest.java  |   7 +-
 .../flink/runtime/metrics/MetricRegistry.java   |  67 ++--
 .../metrics/MetricRegistryConfiguration.java| 168 +++
 .../flink/runtime/jobmanager/JobManager.scala   |   6 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   5 +-
 .../ExecutionGraphMetricsTest.java  |   3 +-
 .../runtime/metrics/MetricRegistryTest.java |  14 +-
 .../metrics/dump/MetricQueryServiceTest.java|   3 +-
 .../metrics/groups/AbstractMetricGroupTest.java |   4 +-
 .../metrics/groups/JobManagerGroupTest.java |  11 +-
 .../metrics/groups/JobManagerJobGroupTest.java  |   9 +-
 .../groups/MetricGroupRegistrationTest.java |   5 +-
 .../runtime/metrics/groups/MetricGroupTest.java |  10 +-
 .../metrics/groups/OperatorGroupTest.java   |   7 +-
 .../metrics/groups/TaskManagerGroupTest.java|  11 +-
 .../metrics/groups/TaskManagerJobGroupTest.java |   9 +-
 .../metrics/groups/TaskMetricGroupTest.java |  11 +-
 .../testutils/UnregisteredTaskMetricsGroup.java |   4 +-
 ...askManagerComponentsStartupShutdownTest.java |   5 +-
 22 files changed, 265 insertions(+), 114 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
--
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
index 1440028..9385510 100644
--- 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
+++ 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/ScheduledDropwizardReporterTest.java
@@ -29,6 +29,7 @@ import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.runtime.metrics.MetricRegistry;
+import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
 import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
@@ -78,7 +79,9 @@ public class ScheduledDropwizardReporterTest {

configuration.setString(ConfigConstants.METRICS_SCOPE_NAMING_TASK, 
"..");

configuration.setString(ConfigConstants.METRICS_SCOPE_DELIMITER, "_");
 
-   MetricRegistry metricRegistry = new 
MetricRegistry(configuration);
+   MetricRegistryConfiguration metricRegistryConfiguration = 
MetricRegistryConfiguration.fromConfiguration(configuration);
+
+   MetricRegistry metricRegistry = new 
MetricRegistry(metricRegistryConfiguration);
 
char delimiter = metricRegistry.getDelimiter();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/f1b5b35f/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
--
diff --git 
a/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apache/flink/dropwizard/metrics/DropwizardFlinkHistogramWrapperTest.java
 
b/flink-metrics/flink-metrics-dropwizard/src/test/java/org/apach

[1/2] flink git commit: [FLINK-4543] [network] Fix potential deadlock in SpilledSubpartitionViewAsyncIO.

2016-09-27 Thread sewen
Repository: flink
Updated Branches:
  refs/heads/master f1b5b35f5 -> 90902914a


[FLINK-4543] [network] Fix potential deadlock in SpilledSubpartitionViewAsyncIO.

The deadlock could occur in cases where the SpilledSubpartitionViewAsyncIO 
would simultaneously try to
release a buffer and encounter an error in another thread.

The field of congestion was the listener, which is now replaced by an 
AtomicReference, removing the
necessity to lock in the case of reporting the error.

This closes #2444


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/90902914
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/90902914
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/90902914

Branch: refs/heads/master
Commit: 90902914ac4b11f9554b67ad49e0d697a0d02f93
Parents: b928935
Author: Stephan Ewen 
Authored: Wed Aug 31 16:22:34 2016 +0200
Committer: Stephan Ewen 
Committed: Tue Sep 27 19:46:40 2016 +0200

--
 .../SpilledSubpartitionViewAsyncIO.java | 26 
 .../checkpoint/CheckpointIDCounterTest.java |  4 +--
 2 files changed, 11 insertions(+), 19 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/90902914/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
index daccd28..ca25536 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionViewAsyncIO.java
@@ -30,6 +30,7 @@ import 
org.apache.flink.runtime.util.event.NotificationListener;
 import java.io.IOException;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
 import static org.apache.flink.util.Preconditions.checkNotNull;
@@ -70,7 +71,7 @@ class SpilledSubpartitionViewAsyncIO implements 
ResultSubpartitionView {
private final ConcurrentLinkedQueue returnedBuffers = new 
ConcurrentLinkedQueue();
 
/** A data availability listener. */
-   private NotificationListener registeredListener;
+   private final AtomicReference registeredListener;
 
/** Error, which has occurred in the I/O thread. */
private volatile IOException errorInIOThread;
@@ -108,7 +109,8 @@ class SpilledSubpartitionViewAsyncIO implements 
ResultSubpartitionView {
this.parent = checkNotNull(parent);
this.bufferProvider = checkNotNull(bufferProvider);
this.bufferAvailabilityListener = new 
BufferProviderCallback(this);
-
+   this.registeredListener = new AtomicReference<>();
+   
this.asyncFileReader = 
ioManager.createBufferFileReader(channelId, new IOThreadCallback(this));
 
if (initialSeekPosition > 0) {
@@ -154,14 +156,12 @@ class SpilledSubpartitionViewAsyncIO implements 
ResultSubpartitionView {
return false;
}
 
-   if (registeredListener == null) {
-   registeredListener = listener;
-
+   if (registeredListener.compareAndSet(null, listener)) {
return true;
+   } else {
+   throw new IllegalStateException("already 
registered listener");
}
}
-
-   throw new IllegalStateException("Already registered listener.");
}
 
@Override
@@ -279,8 +279,8 @@ class SpilledSubpartitionViewAsyncIO implements 
ResultSubpartitionView {
 
returnedBuffers.add(buffer);
 
-   listener = registeredListener;
-   registeredListener = null;
+   // after this, the listener should be null
+   listener = registeredListener.getAndSet(null);
 
// If this was the last buffer before we reached EOF, 
set the corresponding flag to
// ensure that further buffers are correctly recycled 
and eventually no further reads
@@ -303,13 +303,7 @@ class SpilledSubpartitionViewAsyncIO implements 
ResultSubpartitionView {
errorInIOThread = error;
}
 
-   final NotificationListener lis

[2/2] flink git commit: [FLINK-4560] [build] Enforcer Java version >= 1.7 via Maven enforcer plugin

2016-09-27 Thread sewen
[FLINK-4560] [build] Enforcer Java version >= 1.7 via Maven enforcer plugin

This closes #2458


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b928935b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b928935b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b928935b

Branch: refs/heads/master
Commit: b928935b8c5be02b23dd2cb87144ae1ea001278c
Parents: f1b5b35
Author: shijinkui 
Authored: Fri Sep 2 10:46:45 2016 +0800
Committer: Stephan Ewen 
Committed: Tue Sep 27 19:46:40 2016 +0200

--
 pom.xml | 10 +++---
 1 file changed, 7 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b928935b/pom.xml
--
diff --git a/pom.xml b/pom.xml
index 7e517e9..b2229fb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,7 @@ under the License.
1.7.7
18.0
2.3.7
+   1.7
2.0.1

2.10.4
@@ -929,8 +930,8 @@ under the License.
maven-compiler-plugin
3.1

-   1.7
-   1.7
+   ${java.version}
+   ${java.version}


-Xlint:all
@@ -999,7 +1000,7 @@ under the License.

org.apache.maven.plugins
maven-enforcer-plugin
-   1.3.1
+   1.4.1


enforce-maven
@@ -1012,6 +1013,9 @@ under the License.


[3.0.3,)


+   

+   
${java.version}
+   







buildbot success in on flink-docs-release-0.10

2016-09-27 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.10 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.10/builds/344

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.10' 
triggered this build
Build Source Stamp: [branch release-0.10] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot