flink git commit: [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase
Repository: flink Updated Branches: refs/heads/release-1.3 8de51f953 -> 5281dd659 [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5281dd65 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5281dd65 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5281dd65 Branch: refs/heads/release-1.3 Commit: 5281dd6598f17c8dfe0c7b091c90c8721d305375 Parents: 8de51f9 Author: Tzu-Li (Gordon) TaiAuthored: Tue Jun 13 11:47:44 2017 +0200 Committer: Tzu-Li (Gordon) Tai Committed: Tue Jun 13 11:50:34 2017 +0200 -- .../scala/typeutils/EnumValueSerializer.scala | 31 ++- .../api/scala/typeutils/TrySerializer.scala | 2 +- .../_metadata | Bin 213855 -> 218895 bytes .../_metadata | Bin 213855 -> 218895 bytes .../_metadata | Bin 213855 -> 218119 bytes .../_metadata | Bin 213855 -> 218119 bytes 6 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5281dd65/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala -- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala index 50526f5..119db93 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala @@ -78,8 +78,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[ // override def snapshotConfiguration(): EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = { -new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E]( - enum.getClass.asInstanceOf[Class[E]]) +new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](enum) } override def ensureCompatibility( @@ -89,14 +88,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[ case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] => val enumClass = enum.getClass.asInstanceOf[Class[E]] if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) { - val currentEnumConstants = enumSerializerConfigSnapshot.getEnumClass.getEnumConstants + val previousEnumConstants = enumSerializerConfigSnapshot.getEnumConstants - for ( i <- 0 to currentEnumConstants.length) { -// compatible only if new enum constants are only appended, -// and original constants must be in the exact same order + if (previousEnumConstants != null) { +for (i <- enum.values.iterator) { + if (!previousEnumConstants(i.id).equals(i.toString)) { +// compatible only if new enum constants are only appended, +// and original constants must be in the exact same order -if (currentEnumConstants(i) != enumSerializerConfigSnapshot.getEnumConstants(i)) { - return CompatibilityResult.requiresMigration() +return CompatibilityResult.requiresMigration() + } } } @@ -116,12 +117,12 @@ object EnumValueSerializer { extends TypeSerializerConfigSnapshot { var enumClass: Class[E] = _ -var enumConstants: Array[E] = _ +var enumConstants: List[String] = _ -def this(enumClass: Class[E]) = { +def this(enum: E) = { this() - this.enumClass = Preconditions.checkNotNull(enumClass) - this.enumConstants = enumClass.getEnumConstants + this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]] + this.enumConstants = enum.values.toList.map(_.toString) } override def write(out: DataOutputView): Unit = { @@ -160,7 +161,7 @@ object EnumValueSerializer { def getEnumClass: Class[E] = enumClass -def getEnumConstants: Array[E] = enumConstants +def getEnumConstants: List[String] = enumConstants override def equals(obj: scala.Any): Boolean = { if (obj == this) { @@ -173,12 +174,12 @@ object EnumValueSerializer { obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] && enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass) && -
flink git commit: [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase
Repository: flink Updated Branches: refs/heads/master 68ac96e16 -> 3bad77c0a [hotfix] [tests] Fix failing Scala StatefulJobSavepointMigrationITCase Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3bad77c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3bad77c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3bad77c0 Branch: refs/heads/master Commit: 3bad77c0ae932a926260b769efb151a89fc309ab Parents: 68ac96e Author: Tzu-Li (Gordon) TaiAuthored: Tue Jun 13 11:47:44 2017 +0200 Committer: Tzu-Li (Gordon) Tai Committed: Tue Jun 13 11:49:34 2017 +0200 -- .../scala/typeutils/EnumValueSerializer.scala | 31 ++- .../api/scala/typeutils/TrySerializer.scala | 2 +- .../_metadata | Bin 213855 -> 218895 bytes .../_metadata | Bin 213855 -> 218895 bytes .../_metadata | Bin 213855 -> 218119 bytes .../_metadata | Bin 213855 -> 218119 bytes 6 files changed, 17 insertions(+), 16 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3bad77c0/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala -- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala index 50526f5..119db93 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EnumValueSerializer.scala @@ -78,8 +78,7 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[ // override def snapshotConfiguration(): EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E] = { -new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E]( - enum.getClass.asInstanceOf[Class[E]]) +new EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[E](enum) } override def ensureCompatibility( @@ -89,14 +88,16 @@ class EnumValueSerializer[E <: Enumeration](val enum: E) extends TypeSerializer[ case enumSerializerConfigSnapshot: EnumValueSerializer.ScalaEnumSerializerConfigSnapshot[_] => val enumClass = enum.getClass.asInstanceOf[Class[E]] if (enumClass.equals(enumSerializerConfigSnapshot.getEnumClass)) { - val currentEnumConstants = enumSerializerConfigSnapshot.getEnumClass.getEnumConstants + val previousEnumConstants = enumSerializerConfigSnapshot.getEnumConstants - for ( i <- 0 to currentEnumConstants.length) { -// compatible only if new enum constants are only appended, -// and original constants must be in the exact same order + if (previousEnumConstants != null) { +for (i <- enum.values.iterator) { + if (!previousEnumConstants(i.id).equals(i.toString)) { +// compatible only if new enum constants are only appended, +// and original constants must be in the exact same order -if (currentEnumConstants(i) != enumSerializerConfigSnapshot.getEnumConstants(i)) { - return CompatibilityResult.requiresMigration() +return CompatibilityResult.requiresMigration() + } } } @@ -116,12 +117,12 @@ object EnumValueSerializer { extends TypeSerializerConfigSnapshot { var enumClass: Class[E] = _ -var enumConstants: Array[E] = _ +var enumConstants: List[String] = _ -def this(enumClass: Class[E]) = { +def this(enum: E) = { this() - this.enumClass = Preconditions.checkNotNull(enumClass) - this.enumConstants = enumClass.getEnumConstants + this.enumClass = Preconditions.checkNotNull(enum).getClass.asInstanceOf[Class[E]] + this.enumConstants = enum.values.toList.map(_.toString) } override def write(out: DataOutputView): Unit = { @@ -160,7 +161,7 @@ object EnumValueSerializer { def getEnumClass: Class[E] = enumClass -def getEnumConstants: Array[E] = enumConstants +def getEnumConstants: List[String] = enumConstants override def equals(obj: scala.Any): Boolean = { if (obj == this) { @@ -173,12 +174,12 @@ object EnumValueSerializer { obj.isInstanceOf[ScalaEnumSerializerConfigSnapshot[E]] && enumClass.equals(obj.asInstanceOf[ScalaEnumSerializerConfigSnapshot[E]].enumClass) && -enumConstants.sameElements( +
flink git commit: [FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages
Repository: flink Updated Branches: refs/heads/master 1a658775c -> 68ac96e16 [FLINK-6685] Adjust scopes of SafetyNetCloseableRegistry usages This closes #4108. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/68ac96e1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/68ac96e1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/68ac96e1 Branch: refs/heads/master Commit: 68ac96e16c09d7aee64d3dc0e5629cc308fb087f Parents: 1a65877 Author: Stefan RichterAuthored: Mon Jun 12 11:48:15 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 11:35:44 2017 +0200 -- .../java/org/apache/flink/core/fs/FileSystemSafetyNet.java | 2 -- .../main/java/org/apache/flink/runtime/taskmanager/Task.java | 6 ++ .../org/apache/flink/streaming/runtime/tasks/StreamTask.java | 3 +++ 3 files changed, 5 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java -- diff --git a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java index 1391a33..c06ccac 100644 --- a/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java +++ b/flink-core/src/main/java/org/apache/flink/core/fs/FileSystemSafetyNet.java @@ -118,9 +118,7 @@ public class FileSystemSafetyNet { /** * Sets the active safety-net registry for the current thread. -* @deprecated This method should be removed after FLINK-6684 is implemented. */ - @Deprecated @Internal public static void setSafetyNetCloseableRegistryForThread(SafetyNetCloseableRegistry registry) { REGISTRIES.set(registry); http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index e18628e..9dc6e34 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -1177,7 +1177,7 @@ public class Task implements Runnable, TaskActions { Runnable runnable = new Runnable() { @Override public void run() { - // activate safety net for checkpointing thread + // set safety net from the task's context for checkpointing thread LOG.debug("Creating FileSystem stream leak safety net for {}", Thread.currentThread().getName()); FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(safetyNetCloseableRegistry); @@ -1200,9 +1200,7 @@ public class Task implements Runnable, TaskActions { taskNameWithSubtask, executionId, t); } } finally { - // close and de-activate safety net for checkpointing thread - LOG.debug("Ensuring all FileSystem streams are closed for {}", - Thread.currentThread().getName()); + FileSystemSafetyNet.setSafetyNetCloseableRegistryForThread(null); } } }; http://git-wip-us.apache.org/repos/asf/flink/blob/68ac96e1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 5fb5d2d..c35a6dc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -21,6 +21,7 @@
flink git commit: [FLINK-6744] [tests] Harden ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether
Repository: flink Updated Branches: refs/heads/release-1.3 a3103c2dd -> 06dfb57b8 [FLINK-6744] [tests] Harden ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether Increase the timeout for the verification check that the TaskManagerGateway#submitTask method has been called. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/06dfb57b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/06dfb57b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/06dfb57b Branch: refs/heads/release-1.3 Commit: 06dfb57b86d5397b4fd9864f59998af16b8f73e7 Parents: a3103c2 Author: Till RohrmannAuthored: Tue Jun 13 10:56:25 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 10:58:23 2017 +0200 -- .../runtime/executiongraph/ExecutionGraphSchedulingTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/06dfb57b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 1eecd4a..c2eea5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -243,10 +243,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // verify that all deployments have happened for (TaskManagerGateway gateway : sourceTaskManagers) { - verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + verify(gateway, timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); } for (TaskManagerGateway gateway : targetTaskManagers) { - verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + verify(gateway, timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); } }
flink git commit: [FLINK-6744] [tests] Harden ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether
Repository: flink Updated Branches: refs/heads/master ab2fc0230 -> 1a658775c [FLINK-6744] [tests] Harden ExecutionGraphSchedulingTest.testDeployPipelinedConnectedComponentsTogether Increase the timeout for the verification check that the TaskManagerGateway#submitTask method has been called. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1a658775 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1a658775 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1a658775 Branch: refs/heads/master Commit: 1a658775c28f622e127ea256036d8604248e7500 Parents: ab2fc02 Author: Till RohrmannAuthored: Tue Jun 13 10:56:25 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 10:56:25 2017 +0200 -- .../runtime/executiongraph/ExecutionGraphSchedulingTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1a658775/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 1eecd4a..c2eea5c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -243,10 +243,10 @@ public class ExecutionGraphSchedulingTest extends TestLogger { // verify that all deployments have happened for (TaskManagerGateway gateway : sourceTaskManagers) { - verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + verify(gateway, timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); } for (TaskManagerGateway gateway : targetTaskManagers) { - verify(gateway, timeout(50)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); + verify(gateway, timeout(500L)).submitTask(any(TaskDeploymentDescriptor.class), any(Time.class)); } }
[1/2] flink git commit: [FLINK-6833] [task] Fail StreamTask only due to async exception if it is running
Repository: flink Updated Branches: refs/heads/release-1.3 db975260c -> a3103c2dd [FLINK-6833] [task] Fail StreamTask only due to async exception if it is running In order to resolve a race condition between a properly terminated StreamTask which cleans up its resources (stopping asynchronous operations, etc.) and a cancelled asynchronous operation (e.g. asynchronous checkpointing operation), we check whether the StreamTask is still running before failing it externally. This closes #4058. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a3103c2d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a3103c2d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a3103c2d Branch: refs/heads/release-1.3 Commit: a3103c2ddd6b8a7566b8fd27f3acd695b4b345c7 Parents: a1d70f5 Author: Till RohrmannAuthored: Fri Jun 2 15:48:54 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 10:35:58 2017 +0200 -- .../streaming/runtime/tasks/StreamTask.java | 7 +- .../tasks/StreamTaskTerminationTest.java| 288 +++ 2 files changed, 293 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a3103c2d/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index bc66751..5495970 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -822,11 +822,14 @@ public abstract class StreamTask * FAILED, and, if the invokable code is running, starts an asynchronous thread * that aborts that code. * -* This method never blocks. +* This method never blocks. */ @Override public void handleAsyncException(String message, Throwable exception) { - getEnvironment().failExternally(exception); + if (isRunning) { + // only fail if the task is still running + getEnvironment().failExternally(exception); + } } // http://git-wip-us.apache.org/repos/asf/flink/blob/a3103c2d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java new file mode 100644 index 000..f021b38 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
[2/2] flink git commit: [FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable
[FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable This closes #4107. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a1d70f57 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a1d70f57 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a1d70f57 Branch: refs/heads/release-1.3 Commit: a1d70f5730c7186dd4aa02ec5628a19d8c56f6e8 Parents: db97526 Author: Till RohrmannAuthored: Mon Jun 12 15:21:47 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 10:35:58 2017 +0200 -- .../org/apache/flink/runtime/state/heap/NestedMapsStateTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a1d70f57/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java index 75c31db..e31e58f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java @@ -68,7 +68,7 @@ public class NestedMapsStateTable extends StateTable { this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup(); @SuppressWarnings("unchecked") - Map >[] state = (Map >[]) new Map[keyContext.getNumberOfKeyGroups()]; + Map >[] state = (Map >[]) new Map[keyContext.getKeyGroupRange().getNumberOfKeyGroups()]; this.state = state; }
[2/2] flink git commit: [FLINK-6833] [task] Fail StreamTask only due to async exception if it is running
[FLINK-6833] [task] Fail StreamTask only due to async exception if it is running In order to resolve a race condition between a properly terminated StreamTask which cleans up its resources (stopping asynchronous operations, etc.) and a cancelled asynchronous operation (e.g. asynchronous checkpointing operation), we check whether the StreamTask is still running before failing it externally. This closes #4058. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ab2fc023 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ab2fc023 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ab2fc023 Branch: refs/heads/master Commit: ab2fc023047662b650af6e6625196bd72cbb30a0 Parents: 7351ec3 Author: Till RohrmannAuthored: Fri Jun 2 15:48:54 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 10:30:42 2017 +0200 -- .../streaming/runtime/tasks/StreamTask.java | 7 +- .../tasks/StreamTaskTerminationTest.java| 288 +++ 2 files changed, 293 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/ab2fc023/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 4dd708f..5fb5d2d 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -824,11 +824,14 @@ public abstract class StreamTask * FAILED, and, if the invokable code is running, starts an asynchronous thread * that aborts that code. * -* This method never blocks. +* This method never blocks. */ @Override public void handleAsyncException(String message, Throwable exception) { - getEnvironment().failExternally(exception); + if (isRunning) { + // only fail if the task is still running + getEnvironment().failExternally(exception); + } } // http://git-wip-us.apache.org/repos/asf/flink/blob/ab2fc023/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java new file mode 100644 index 000..f021b38 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java @@ -0,0 +1,288 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.tasks; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.blob.BlobKey; +import org.apache.flink.runtime.broadcast.BroadcastVariableManager; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.Future; +import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import
[1/2] flink git commit: [FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable
Repository: flink Updated Branches: refs/heads/master 23c82e3cc -> ab2fc0230 [FLINK-6899] [state] Create correctly sized state array in NestedMapsStateTable This closes #4107. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7351ec3a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7351ec3a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7351ec3a Branch: refs/heads/master Commit: 7351ec3aee4077736f05bf66cc430590f064cf2b Parents: 23c82e3 Author: Till RohrmannAuthored: Mon Jun 12 15:21:47 2017 +0200 Committer: Till Rohrmann Committed: Tue Jun 13 10:30:28 2017 +0200 -- .../org/apache/flink/runtime/state/heap/NestedMapsStateTable.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/7351ec3a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java index 75c31db..e31e58f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/NestedMapsStateTable.java @@ -68,7 +68,7 @@ public class NestedMapsStateTable extends StateTable { this.keyGroupOffset = keyContext.getKeyGroupRange().getStartKeyGroup(); @SuppressWarnings("unchecked") - Map >[] state = (Map >[]) new Map[keyContext.getNumberOfKeyGroups()]; + Map >[] state = (Map >[]) new Map[keyContext.getKeyGroupRange().getNumberOfKeyGroups()]; this.state = state; }
buildbot success in on flink-docs-release-0.9
The Buildbot has detected a restored build on builder flink-docs-release-0.9 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.9/builds/720 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' triggered this build Build Source Stamp: [branch release-0.9] HEAD Blamelist: Build succeeded! Sincerely, -The Buildbot