flink git commit: [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase

2017-06-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/release-1.3 8de51f953 -> 5281dd659


[hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5281dd65
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5281dd65
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5281dd65

Branch: refs/heads/release-1.3
Commit: 5281dd6598f17c8dfe0c7b091c90c8721d305375
Parents: 8de51f9
Author: Tzu-Li (Gordon) Tai 
Authored: Tue Jun 13 11:47:44 2017 +0200
Committer: Tzu-Li (Gordon) Tai 
Committed: Tue Jun 13 11:50:34 2017 +0200

--
 .../scala/typeutils/EnumValueSerializer.scala   |  31 ++-
 .../api/scala/typeutils/TrySerializer.scala |   2 +-
 .../_metadata   | Bin 213855 -> 218895 bytes
 .../_metadata   | Bin 213855 -> 218895 bytes
 .../_metadata   | Bin 213855 -> 218119 bytes
 .../_metadata   | Bin 213855 -> 218119 bytes
 6 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5281dd65/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
--
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 50526f5..119db93 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -78,8 +78,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) 
extends TypeSerializer[
   // 

 
   override def snapshotConfiguration(): 
EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = {
-new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](
-  enum.getClass.asInstanceOf[Class[E]])
+new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](enum)
   }
 
   override def ensureCompatibility(
@@ -89,14 +88,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) 
extends TypeSerializer[
   case enumSerializerConfigSnapshot: 
EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
 val enumClass = enum.getClass.asInstanceOf[Class[E]]
 if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
-  val currentEnumConstants = 
enumSerializerConfigSnapshot.getEnumClass.getEnumConstants
+  val previousEnumConstants = 
enumSerializerConfigSnapshot.getEnumConstants
 
-  for ( i <- 0 to currentEnumConstants.length) {
-// compatible only if new enum constants are only appended,
-// and original constants must be in the exact same order
+  if (previousEnumConstants != null) {
+for (i <- enum.values.iterator) {
+  if (!previousEnumConstants(i.id).equals(i.toString)) {
+// compatible only if new enum constants are only appended,
+// and original constants must be in the exact same order
 
-if (currentEnumConstants(i) != 
enumSerializerConfigSnapshot.getEnumConstants(i)) {
-  return CompatibilityResult.requiresMigration()
+return CompatibilityResult.requiresMigration()
+  }
 }
   }
 
@@ -116,12 +117,12 @@ object EnumValueSerializer {
   extends TypeSerializerConfigSnapshot {
 
 var enumClass: Class[E] = _
-var enumConstants: Array[E] = _
+var enumConstants: List[String] = _
 
-def this(enumClass: Class[E]) = {
+def this(enum: E) = {
   this()
-  this.enumClass = Preconditions.checkNotNull(enumClass)
-  this.enumConstants = enumClass.getEnumConstants
+  this.enumClass = 
Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
+  this.enumConstants = enum.values.toList.map(_.toString)
 }
 
 override def write(out: DataOutputView): Unit = {
@@ -160,7 +161,7 @@ object EnumValueSerializer {
 
 def getEnumClass: Class[E] = enumClass
 
-def getEnumConstants: Array[E] = enumConstants
+def getEnumConstants: List[String] = enumConstants
 
 override def equals(obj: scala.Any): Boolean = {
   if (obj == this) {
@@ -173,12 +174,12 @@ object EnumValueSerializer {
 
   obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] &&
 
enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass)
 &&
-

flink git commit: [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase

2017-06-13 Thread tzulitai
Repository: flink
Updated Branches:
  refs/heads/master 68ac96e16 -> 3bad77c0a


[hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bad77c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bad77c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bad77c0

Branch: refs/heads/master
Commit: 3bad77c0ae932a926260b769efb151a89fc309ab
Parents: 68ac96e
Author: Tzu-Li (Gordon) Tai 
Authored: Tue Jun 13 11:47:44 2017 +0200
Committer: Tzu-Li (Gordon) Tai 
Committed: Tue Jun 13 11:49:34 2017 +0200

--
 .../scala/typeutils/EnumValueSerializer.scala   |  31 ++-
 .../api/scala/typeutils/TrySerializer.scala |   2 +-
 .../_metadata   | Bin 213855 -> 218895 bytes
 .../_metadata   | Bin 213855 -> 218895 bytes
 .../_metadata   | Bin 213855 -> 218119 bytes
 .../_metadata   | Bin 213855 -> 218119 bytes
 6 files changed, 17 insertions(+), 16 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
--
diff --git 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
index 50526f5..119db93 100644
--- 
a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
+++ 
b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala
@@ -78,8 +78,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) 
extends TypeSerializer[
   // 

 
   override def snapshotConfiguration(): 
EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = {
-new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](
-  enum.getClass.asInstanceOf[Class[E]])
+new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](enum)
   }
 
   override def ensureCompatibility(
@@ -89,14 +88,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) 
extends TypeSerializer[
   case enumSerializerConfigSnapshot: 
EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] =>
 val enumClass = enum.getClass.asInstanceOf[Class[E]]
 if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) {
-  val currentEnumConstants = 
enumSerializerConfigSnapshot.getEnumClass.getEnumConstants
+  val previousEnumConstants = 
enumSerializerConfigSnapshot.getEnumConstants
 
-  for ( i <- 0 to currentEnumConstants.length) {
-// compatible only if new enum constants are only appended,
-// and original constants must be in the exact same order
+  if (previousEnumConstants != null) {
+for (i <- enum.values.iterator) {
+  if (!previousEnumConstants(i.id).equals(i.toString)) {
+// compatible only if new enum constants are only appended,
+// and original constants must be in the exact same order
 
-if (currentEnumConstants(i) != 
enumSerializerConfigSnapshot.getEnumConstants(i)) {
-  return CompatibilityResult.requiresMigration()
+return CompatibilityResult.requiresMigration()
+  }
 }
   }
 
@@ -116,12 +117,12 @@ object EnumValueSerializer {
   extends TypeSerializerConfigSnapshot {
 
 var enumClass: Class[E] = _
-var enumConstants: Array[E] = _
+var enumConstants: List[String] = _
 
-def this(enumClass: Class[E]) = {
+def this(enum: E) = {
   this()
-  this.enumClass = Preconditions.checkNotNull(enumClass)
-  this.enumConstants = enumClass.getEnumConstants
+  this.enumClass = 
Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]]
+  this.enumConstants = enum.values.toList.map(_.toString)
 }
 
 override def write(out: DataOutputView): Unit = {
@@ -160,7 +161,7 @@ object EnumValueSerializer {
 
 def getEnumClass: Class[E] = enumClass
 
-def getEnumConstants: Array[E] = enumConstants
+def getEnumConstants: List[String] = enumConstants
 
 override def equals(obj: scala.Any): Boolean = {
   if (obj == this) {
@@ -173,12 +174,12 @@ object EnumValueSerializer {
 
   obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] &&
 
enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass)
 &&
-enumConstants.sameElements(
+ 

flink git commit: [FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages

2017-06-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 1a658775c -> 68ac96e16


[FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages

This closes #4108.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68ac96e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68ac96e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68ac96e1

Branch: refs/heads/master
Commit: 68ac96e16c09d7aee64d3dc0e5629cc308fb087f
Parents: 1a65877
Author: Stefan Richter 
Authored: Mon Jun 12 11:48:15 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 11:35:44 2017 +0200

--
 .../java/org/apache/flink/core/fs/FileSystemSafetyNet.java | 2 --
 .../main/java/org/apache/flink/runtime/taskmanager/Task.java   | 6 ++
 .../org/apache/flink/streaming/runtime/tasks/StreamTask.java   | 3 +++
 3 files changed, 5 insertions(+), 6 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
--
diff --git 
a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java 
b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
index 1391a33..c06ccac 100644
--- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
+++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java
@@ -118,9 +118,7 @@ public class FileSystemSafetyNet {
 
/**
 * Sets the active safety-net registry for the current thread.
-* @deprecated This method should be removed after FLINK-6684 is 
implemented.
 */
-   @Deprecated
@Internal
public static void 
setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) {
REGISTRIES.set(registry);

http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index e18628e..9dc6e34 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -1177,7 +1177,7 @@ public class Task implements Runnable, TaskActions {
Runnable runnable = new Runnable() {
@Override
public void run() {
-   // activate safety net for 
checkpointing thread
+   // set safety net from the 
task's context for checkpointing thread
LOG.debug("Creating FileSystem 
stream leak safety net for {}", Thread.currentThread().getName());

FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry);
 
@@ -1200,9 +1200,7 @@ public class Task implements Runnable, TaskActions {

taskNameWithSubtask, executionId, t);
}
} finally {
-   // close and 
de-activate safety net for checkpointing thread
-   LOG.debug("Ensuring all 
FileSystem streams are closed for {}",
-   
Thread.currentThread().getName());
+   
FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null);
}
}
};

http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 5fb5d2d..c35a6dc 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -21,6 +21,7 @@ 

flink git commit: [FLINK-6744] [tests] Harden ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether

2017-06-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.3 a3103c2dd -> 06dfb57b8


[FLINK-6744] [tests] Harden 
ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether

Increase the timeout for the verification check that the 
TaskManagerGateway#submitTask
method has been called.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06dfb57b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06dfb57b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06dfb57b

Branch: refs/heads/release-1.3
Commit: 06dfb57b86d5397b4fd9864f59998af16b8f73e7
Parents: a3103c2
Author: Till Rohrmann 
Authored: Tue Jun 13 10:56:25 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 10:58:23 2017 +0200

--
 .../runtime/executiongraph/ExecutionGraphSchedulingTest.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/06dfb57b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 1eecd4a..c2eea5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -243,10 +243,10 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
//  verify that all deployments have happened
 
for (TaskManagerGateway gateway : sourceTaskManagers) {
-   verify(gateway, 
timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+   verify(gateway, 
timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}
for (TaskManagerGateway gateway : targetTaskManagers) {
-   verify(gateway, 
timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+   verify(gateway, 
timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}
}
 



flink git commit: [FLINK-6744] [tests] Harden ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether

2017-06-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master ab2fc0230 -> 1a658775c


[FLINK-6744] [tests] Harden 
ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether

Increase the timeout for the verification check that the 
TaskManagerGateway#submitTask
method has been called.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a658775
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a658775
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a658775

Branch: refs/heads/master
Commit: 1a658775c28f622e127ea256036d8604248e7500
Parents: ab2fc02
Author: Till Rohrmann 
Authored: Tue Jun 13 10:56:25 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 10:56:25 2017 +0200

--
 .../runtime/executiongraph/ExecutionGraphSchedulingTest.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1a658775/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 1eecd4a..c2eea5c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -243,10 +243,10 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
//  verify that all deployments have happened
 
for (TaskManagerGateway gateway : sourceTaskManagers) {
-   verify(gateway, 
timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+   verify(gateway, 
timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}
for (TaskManagerGateway gateway : targetTaskManagers) {
-   verify(gateway, 
timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
+   verify(gateway, 
timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class));
}
}
 



[1/2] flink git commit: [FLINK-6833] [task] Fail StreamTask only due to async exception if it is running

2017-06-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.3 db975260c -> a3103c2dd


[FLINK-6833] [task] Fail StreamTask only due to async exception if it is running

In order to resolve a race condition between a properly terminated StreamTask 
which
cleans up its resources (stopping asynchronous operations, etc.) and a cancelled
asynchronous operation (e.g. asynchronous checkpointing operation), we check 
whether
the StreamTask is still running before failing it externally.

This closes #4058.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3103c2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3103c2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3103c2d

Branch: refs/heads/release-1.3
Commit: a3103c2ddd6b8a7566b8fd27f3acd695b4b345c7
Parents: a1d70f5
Author: Till Rohrmann 
Authored: Fri Jun 2 15:48:54 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 10:35:58 2017 +0200

--
 .../streaming/runtime/tasks/StreamTask.java |   7 +-
 .../tasks/StreamTaskTerminationTest.java| 288 +++
 2 files changed, 293 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a3103c2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index bc66751..5495970 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -822,11 +822,14 @@ public abstract class StreamTask
 * FAILED, and, if the invokable code is running, starts an 
asynchronous thread
 * that aborts that code.
 *
-* This method never blocks.
+* This method never blocks.
 */
@Override
public void handleAsyncException(String message, Throwable exception) {
-   getEnvironment().failExternally(exception);
+   if (isRunning) {
+   // only fail if the task is still running
+   getEnvironment().failExternally(exception);
+   }
}
 
// 


http://git-wip-us.apache.org/repos/asf/flink/blob/a3103c2d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
new file mode 100644
index 000..f021b38
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;

[2/2] flink git commit: [FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable

2017-06-13 Thread trohrmann
[FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable

This closes #4107.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1d70f57
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1d70f57
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1d70f57

Branch: refs/heads/release-1.3
Commit: a1d70f5730c7186dd4aa02ec5628a19d8c56f6e8
Parents: db97526
Author: Till Rohrmann 
Authored: Mon Jun 12 15:21:47 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 10:35:58 2017 +0200

--
 .../org/apache/flink/runtime/state/heap/NestedMapsStateTable.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a1d70f57/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 75c31db..e31e58f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -68,7 +68,7 @@ public class NestedMapsStateTable extends 
StateTable {
this.keyGroupOffset = 
keyContext.getKeyGroupRange().getStartKeyGroup();
 
@SuppressWarnings("unchecked")
-   Map>[] state = (Map>[]) new 
Map[keyContext.getNumberOfKeyGroups()];
+   Map>[] state = (Map>[]) new 
Map[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];
this.state = state;
}
 



[2/2] flink git commit: [FLINK-6833] [task] Fail StreamTask only due to async exception if it is running

2017-06-13 Thread trohrmann
[FLINK-6833] [task] Fail StreamTask only due to async exception if it is running

In order to resolve a race condition between a properly terminated StreamTask 
which
cleans up its resources (stopping asynchronous operations, etc.) and a cancelled
asynchronous operation (e.g. asynchronous checkpointing operation), we check 
whether
the StreamTask is still running before failing it externally.

This closes #4058.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab2fc023
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab2fc023
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab2fc023

Branch: refs/heads/master
Commit: ab2fc023047662b650af6e6625196bd72cbb30a0
Parents: 7351ec3
Author: Till Rohrmann 
Authored: Fri Jun 2 15:48:54 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 10:30:42 2017 +0200

--
 .../streaming/runtime/tasks/StreamTask.java |   7 +-
 .../tasks/StreamTaskTerminationTest.java| 288 +++
 2 files changed, 293 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/ab2fc023/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 4dd708f..5fb5d2d 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -824,11 +824,14 @@ public abstract class StreamTask
 * FAILED, and, if the invokable code is running, starts an 
asynchronous thread
 * that aborts that code.
 *
-* This method never blocks.
+* This method never blocks.
 */
@Override
public void handleAsyncException(String message, Throwable exception) {
-   getEnvironment().failExternally(exception);
+   if (isRunning) {
+   // only fail if the task is still running
+   getEnvironment().failExternally(exception);
+   }
}
 
// 


http://git-wip-us.apache.org/repos/asf/flink/blob/ab2fc023/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
new file mode 100644
index 000..f021b38
--- /dev/null
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -0,0 +1,288 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.blob.BlobKey;
+import org.apache.flink.runtime.broadcast.BroadcastVariableManager;
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import 

[1/2] flink git commit: [FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable

2017-06-13 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master 23c82e3cc -> ab2fc0230


[FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable

This closes #4107.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7351ec3a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7351ec3a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7351ec3a

Branch: refs/heads/master
Commit: 7351ec3aee4077736f05bf66cc430590f064cf2b
Parents: 23c82e3
Author: Till Rohrmann 
Authored: Mon Jun 12 15:21:47 2017 +0200
Committer: Till Rohrmann 
Committed: Tue Jun 13 10:30:28 2017 +0200

--
 .../org/apache/flink/runtime/state/heap/NestedMapsStateTable.java  | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/7351ec3a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
index 75c31db..e31e58f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java
@@ -68,7 +68,7 @@ public class NestedMapsStateTable extends 
StateTable {
this.keyGroupOffset = 
keyContext.getKeyGroupRange().getStartKeyGroup();
 
@SuppressWarnings("unchecked")
-   Map>[] state = (Map>[]) new 
Map[keyContext.getNumberOfKeyGroups()];
+   Map>[] state = (Map>[]) new 
Map[keyContext.getKeyGroupRange().getNumberOfKeyGroups()];
this.state = state;
}
 



buildbot success in on flink-docs-release-0.9

2017-06-13 Thread buildbot
The Buildbot has detected a restored build on builder flink-docs-release-0.9 
while building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.9/builds/720

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' 
triggered this build
Build Source Stamp: [branch release-0.9] HEAD
Blamelist: 

Build succeeded!

Sincerely,
 -The Buildbot