[flink-web] branch asf-site updated: Add zhijiang to community page

2019-07-23 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new a0c3fab  Add zhijiang to community page
a0c3fab is described below

commit a0c3faba64a66bb5c17889f7d893e4b17de6321d
Author: zhijiang 
AuthorDate: Tue Jul 23 14:22:03 2019 +0800

Add zhijiang to community page
---
 community.md   | 6 ++
 community.zh.md| 6 ++
 content/community.html | 6 ++
 3 files changed, 18 insertions(+)

diff --git a/community.md b/community.md
index 6443f19..4fd0b12 100644
--- a/community.md
+++ b/community.md
@@ -395,6 +395,12 @@ Flink Forward is a conference happening yearly in 
different locations around the
 shaoxuan
   
   
+https://avatars3.githubusercontent.com/u/12387855?s=50"; 
class="committer-avatar">
+Zhijiang Wang
+    Committer
+zhijiang
+  
+  
 https://avatars1.githubusercontent.com/u/1826769?s=50"; 
class="committer-avatar">
 Daniel Warneke
 PMC, Committer
diff --git a/community.zh.md b/community.zh.md
index 9b9c6a3..b596f1c 100644
--- a/community.zh.md
+++ b/community.zh.md
@@ -390,6 +390,12 @@ Flink Forward 大会每年都会在世界的不同地方举办。关于大会最
 shaoxuan
   
   
+https://avatars3.githubusercontent.com/u/12387855?s=50"; 
class="committer-avatar">
+Zhijiang Wang
+Committer
+zhijiang
+  
+  
 https://avatars1.githubusercontent.com/u/1826769?s=50"; 
class="committer-avatar">
 Daniel Warneke
 PMC, Committer
diff --git a/content/community.html b/content/community.html
index ad08e83..5b988de 100644
--- a/content/community.html
+++ b/content/community.html
@@ -583,6 +583,12 @@
 shaoxuan
   
   
+https://avatars3.githubusercontent.com/u/12387855?s=50"; 
class="committer-avatar" />
+Zhijiang Wang
+Committer
+zhijiang
+  
+  
 https://avatars1.githubusercontent.com/u/1826769?s=50"; 
class="committer-avatar" />
 Daniel Warneke
 PMC, Committer



[flink] 11/13: [hotfix][tests] fix codestyle issues in NettyShuffleDescriptorBuilder

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8735a5bc0de4fbe924ae6f9a4c1cc62459206570
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:49:24 2019 +0300

[hotfix][tests] fix codestyle issues in NettyShuffleDescriptorBuilder
---
 .../org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java  | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
index 2d58d03..052fefb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NettyShuffleDescriptorBuilder.java
@@ -38,8 +38,8 @@ public class NettyShuffleDescriptorBuilder {
private ResourceID producerLocation = ResourceID.generate();
private ResultPartitionID id = new ResultPartitionID();
private InetAddress address = InetAddress.getLoopbackAddress();
-   private int dataPort = 0;
-   private int connectionIndex = 0;
+   private int dataPort;
+   private int connectionIndex;
public NettyShuffleDescriptorBuilder setProducerLocation(ResourceID 
producerLocation) {
this.producerLocation = producerLocation;
return this;



[flink] 01/13: [hotfix][tests] Remove setting the default value of force-release-on-consumption

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 642965c0115e14745317428e54546fc7394f5c85
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:14:49 2019 +0300

[hotfix][tests] Remove setting the default value of 
force-release-on-consumption
---
 flink-end-to-end-tests/test-scripts/test_ha_dataset.sh | 1 -
 1 file changed, 1 deletion(-)

diff --git a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh 
b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
index b547142..db7667b 100755
--- a/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
+++ b/flink-end-to-end-tests/test-scripts/test_ha_dataset.sh
@@ -75,7 +75,6 @@ function setup_and_start_cluster() {
 create_ha_config
 
 set_config_key "jobmanager.execution.failover-strategy" "region"
-set_config_key 
"jobmanager.scheduler.partition.force-release-on-consumption" "false"
 set_config_key "taskmanager.numberOfTaskSlots" "1"
 
 set_config_key "restart-strategy" "fixed-delay"



[flink] 02/13: [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release semantics fixed per partition type

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6ce4e2b7ec958570068838491bd8f56f880c
Author: Andrey Zagrebin 
AuthorDate: Fri Jul 26 12:13:23 2019 +0200

[FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make release 
semantics fixed per partition type

In a long term we do not need auto-release semantics for blocking 
(persistent) partition. We expect them always to be released externally by JM 
and assume they can be consumed multiple times.

The pipelined partitions have always only one consumer and one consumption 
attempt. Afterwards they can be always released automatically.

ShuffleDescriptor.ReleaseType was introduced to make release semantics more 
flexible but it is not needed in a long term.

FORCE_PARTITION_RELEASE_ON_CONSUMPTION was introduced as a safety net to be 
able to fallback to 1.8 behaviour without the partition tracker and JM taking 
care about blocking partition release. We can make this option specific for 
NettyShuffleEnvironment which was the only existing shuffle service before. If 
it is activated then the blocking partition is also auto-released on a 
consumption attempt as it was before. The fine-grained recovery will just not 
find the partition after the jo [...]
---
 .../flink/configuration/JobManagerOptions.java |  5 --
 .../NettyShuffleEnvironmentOptions.java|  5 ++
 .../ResultPartitionDeploymentDescriptor.java   | 57 --
 .../flink/runtime/executiongraph/Execution.java|  9 +---
 .../runtime/executiongraph/ExecutionGraph.java | 10 
 .../executiongraph/ExecutionGraphBuilder.java  |  4 --
 .../io/network/NettyShuffleServiceFactory.java |  3 +-
 .../io/network/partition/PartitionTrackerImpl.java |  4 +-
 .../network/partition/ResultPartitionFactory.java  | 10 ++--
 .../runtime/shuffle/NettyShuffleDescriptor.java| 19 +---
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  3 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   | 34 +
 .../flink/runtime/shuffle/ShuffleEnvironment.java  | 26 +-
 .../flink/runtime/shuffle/ShuffleMaster.java   |  5 --
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |  6 ---
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 .../NettyShuffleEnvironmentConfiguration.java  | 21 ++--
 .../ResultPartitionDeploymentDescriptorTest.java   | 15 +-
 .../io/network/NettyShuffleEnvironmentBuilder.java |  3 +-
 .../io/network/partition/PartitionTestUtils.java   | 56 ++---
 .../partition/PartitionTrackerImplTest.java| 43 
 .../network/partition/ResultPartitionBuilder.java  |  4 +-
 .../partition/ResultPartitionFactoryTest.java  | 26 ++
 .../TaskExecutorPartitionLifecycleTest.java|  3 +-
 .../util/NettyShuffleDescriptorBuilder.java| 13 +
 .../recovery/BatchFineGrainedRecoveryITCase.java   |  2 +-
 26 files changed, 103 insertions(+), 287 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
index e062829..3643667 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/JobManagerOptions.java
@@ -195,11 +195,6 @@ public class JobManagerOptions {
.defaultValue(true)
.withDescription("Controls whether partitions should 
already be released during the job execution.");
 
-   @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
-   public static final ConfigOption 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
-   
key("jobmanager.scheduler.partition.force-release-on-consumption")
-   .defaultValue(false);
-
// 
-
 
private JobManagerOptions() {
diff --git 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 4ba4c8e..733085e 100644
--- 
a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ 
b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -231,6 +231,11 @@ public class NettyShuffleEnvironmentOptions {
 
// 

 
+   @Documentation.ExcludeFromDocumentation("dev use only; likely 
temporary")
+   public static final ConfigOption 
FORCE_PARTITION_RELEASE_ON_CONSUMPTION =
+   
key("t

[flink] 13/13: [hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based on ResultPartitionType.isBlocking

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fe12c9761f6d85073bd5d44b107b18117252d91
Author: Andrey Zagrebin 
AuthorDate: Wed Jul 31 16:15:30 2019 +0300

[hotfix][network] Simplify ResultPartitionFactory.createSubpartitions based 
on ResultPartitionType.isBlocking
---
 .../network/partition/ResultPartitionFactory.java  | 27 +-
 .../partition/ResultPartitionFactoryTest.java  | 14 +++
 2 files changed, 25 insertions(+), 16 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index 0656e6e..4933a4e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -133,22 +133,17 @@ public class ResultPartitionFactory {
BoundedBlockingSubpartitionType 
blockingSubpartitionType,
ResultSubpartition[] subpartitions) {
// Create the subpartitions.
-   switch (type) {
-   case BLOCKING:
-   case BLOCKING_PERSISTENT:
-   
initializeBoundedBlockingPartitions(subpartitions, partition, 
blockingSubpartitionType, networkBufferSize, channelManager);
-   break;
-
-   case PIPELINED:
-   case PIPELINED_BOUNDED:
-   for (int i = 0; i < subpartitions.length; i++) {
-   subpartitions[i] = new 
PipelinedSubpartition(i, partition);
-   }
-
-   break;
-
-   default:
-   throw new IllegalArgumentException("Unsupported 
result partition type.");
+   if (type.isBlocking()) {
+   initializeBoundedBlockingPartitions(
+   subpartitions,
+   partition,
+   blockingSubpartitionType,
+   networkBufferSize,
+   channelManager);
+   } else {
+   for (int i = 0; i < subpartitions.length; i++) {
+   subpartitions[i] = new PipelinedSubpartition(i, 
partition);
+   }
}
}
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 1c8591f..653c7f5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -32,6 +32,8 @@ import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
+import java.util.Arrays;
+
 import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -58,6 +60,18 @@ public class ResultPartitionFactoryTest extends TestLogger {
}
 
@Test
+   public void testBoundedBlockingSubpartitionsCreated() {
+   final ResultPartition resultPartition = 
createResultPartition(false, ResultPartitionType.BLOCKING);
+   Arrays.stream(resultPartition.subpartitions).forEach(sp -> 
assertThat(sp, instanceOf(BoundedBlockingSubpartition.class)));
+   }
+
+   @Test
+   public void testPipelinedSubpartitionsCreated() {
+   final ResultPartition resultPartition = 
createResultPartition(false, ResultPartitionType.PIPELINED);
+   Arrays.stream(resultPartition.subpartitions).forEach(sp -> 
assertThat(sp, instanceOf(PipelinedSubpartition.class)));
+   }
+
+   @Test
public void testConsumptionOnReleaseForced() {
final ResultPartition resultPartition = 
createResultPartition(true, ResultPartitionType.BLOCKING);
assertThat(resultPartition, 
instanceOf(ReleaseOnConsumptionResultPartition.class));



[flink] branch master updated (cf763d5 -> 7fe12c9)

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cf763d5  [FLINK-13491][datastream] correctly support endInput in 
AsyncWaitOperator
 new 642965c  [hotfix][tests] Remove setting the default value of 
force-release-on-consumption
 new 6ce4e2b  [FLINK-13435] Remove ShuffleDescriptor.ReleaseType and make 
release semantics fixed per partition type
 new ac0e204  [hotfix][network] fix codestyle issues in 
ResultPartitionFactory
 new 7ee937c  [hotfix][network] Annotate 
NettyShuffleDescriptor#PartitionConnectionInfo with @FunctionalInterface
 new f810a1a  [hotfix][network] fix codestyle issues in NettyShuffleMaster
 new 45f77bb5 [hotfix] fix codestyle issues in ShuffleDescriptor
 new e408636  [hotfix][tests] Make PartitionTestUtils enum singleton and 
fix codestyle
 new e0617be  [hotfix][tests] fix codestyle issues in ResultPartitionBuilder
 new e593c5d7 [hotfix][tests] fix codestyle issues in 
ResultPartitionFactoryTest
 new 4437249  [hotfix][tests] fix codestyle issues in 
NettyShuffleEnvironmentBuilder
 new 8735a5b  [hotfix][tests] fix codestyle issues in 
NettyShuffleDescriptorBuilder
 new 7ec0768  [hotfix][tests] fix codestyle issues in 
NettyShuffleEnvironmentConfiguration
 new 7fe12c9  [hotfix][network] Simplify 
ResultPartitionFactory.createSubpartitions based on 
ResultPartitionType.isBlocking

The 13 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/configuration/JobManagerOptions.java |  5 --
 .../NettyShuffleEnvironmentOptions.java|  5 ++
 .../test-scripts/test_ha_dataset.sh|  1 -
 .../ResultPartitionDeploymentDescriptor.java   | 57 ---
 .../flink/runtime/executiongraph/Execution.java|  9 +--
 .../runtime/executiongraph/ExecutionGraph.java | 10 ---
 .../executiongraph/ExecutionGraphBuilder.java  |  4 --
 .../io/network/NettyShuffleServiceFactory.java |  3 +-
 .../io/network/partition/PartitionTrackerImpl.java |  4 +-
 .../network/partition/ResultPartitionFactory.java  | 84 ++
 .../runtime/shuffle/NettyShuffleDescriptor.java| 20 +-
 .../flink/runtime/shuffle/NettyShuffleMaster.java  |  6 +-
 .../flink/runtime/shuffle/ShuffleDescriptor.java   | 38 ++
 .../flink/runtime/shuffle/ShuffleEnvironment.java  | 26 ---
 .../flink/runtime/shuffle/ShuffleMaster.java   |  5 --
 .../runtime/shuffle/UnknownShuffleDescriptor.java  |  6 --
 .../flink/runtime/taskexecutor/TaskExecutor.java   |  4 +-
 .../NettyShuffleEnvironmentConfiguration.java  | 23 --
 .../ResultPartitionDeploymentDescriptorTest.java   | 15 +---
 .../io/network/NettyShuffleEnvironmentBuilder.java | 52 --
 .../io/network/partition/PartitionTestUtils.java   | 67 -
 .../partition/PartitionTrackerImplTest.java| 43 ++-
 .../network/partition/ResultPartitionBuilder.java  |  9 +--
 .../partition/ResultPartitionFactoryTest.java  | 44 +---
 .../TaskExecutorPartitionLifecycleTest.java|  3 +-
 .../util/NettyShuffleDescriptorBuilder.java| 17 ++---
 .../recovery/BatchFineGrainedRecoveryITCase.java   |  2 +-
 27 files changed, 181 insertions(+), 381 deletions(-)



[flink] 06/13: [hotfix] fix codestyle issues in ShuffleDescriptor

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 45f77bb5da907494fa7a32b146247ec946a826b7
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:32:07 2019 +0300

[hotfix] fix codestyle issues in ShuffleDescriptor
---
 .../main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
index 17feacb..8282630 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ShuffleDescriptor.java
@@ -41,14 +41,14 @@ public interface ShuffleDescriptor extends Serializable {
 * that the producer of the partition (consumer input channel) has not 
been scheduled
 * and its location and other relevant data is yet to be defined.
 * To proceed with the consumer deployment, currently unknown input 
channels have to be
-* marked with placeholders which are special implementation of {@link 
ShuffleDescriptor}:
+* marked with placeholders. The placeholder is a special 
implementation of the shuffle descriptor:
 * {@link UnknownShuffleDescriptor}.
 *
 * Note: this method is not supposed to be overridden in concrete 
shuffle implementation.
 * The only class where it returns {@code true} is {@link 
UnknownShuffleDescriptor}.
 *
 * @return whether the partition producer has been ever deployed and
-* the corresponding {@link ShuffleDescriptor} is obtained from the 
{@link ShuffleMaster} implementation.
+* the corresponding shuffle descriptor is obtained from the {@link 
ShuffleMaster} implementation.
 */
default boolean isUnknown() {
return false;



[flink] 05/13: [hotfix][network] fix codestyle issues in NettyShuffleMaster

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f810a1a20e104dfbfcb26f547828e4080022cbe8
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:30:44 2019 +0300

[hotfix][network] fix codestyle issues in NettyShuffleMaster
---
 .../main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
index 6c2cb32..50c11cf 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleMaster.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.shuffle;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.LocalExecutionPartitionConnectionInfo;
 import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.NetworkPartitionConnectionInfo;
+import 
org.apache.flink.runtime.shuffle.NettyShuffleDescriptor.PartitionConnectionInfo;
 
 import java.util.concurrent.CompletableFuture;
 
@@ -51,7 +52,7 @@ public enum NettyShuffleMaster implements 
ShuffleMaster
public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
}
 
-   private static NettyShuffleDescriptor.PartitionConnectionInfo 
createConnectionInfo(
+   private static PartitionConnectionInfo createConnectionInfo(
ProducerDescriptor producerDescriptor,
int connectionIndex) {
return producerDescriptor.getDataPort() >= 0 ?



[flink] 07/13: [hotfix][tests] Make PartitionTestUtils enum singleton and fix codestyle

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e408636a831b63b414c9705b9c3a006fc0572cdc
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:36:07 2019 +0300

[hotfix][tests] Make PartitionTestUtils enum singleton and fix codestyle
---
 .../runtime/io/network/partition/PartitionTestUtils.java  | 11 ---
 1 file changed, 8 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index cf50051..72892d6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.io.network.partition;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.disk.FileChannelManager;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
@@ -39,7 +40,8 @@ import static org.junit.Assert.fail;
  * While using Mockito internally (for now), the use of Mockito should not
  * leak out of this class.
  */
-public class PartitionTestUtils {
+public enum PartitionTestUtils {
+   ;
 
public static ResultPartition createPartition() {
return createPartition(ResultPartitionType.PIPELINED_BOUNDED);
@@ -83,7 +85,7 @@ public class PartitionTestUtils {
}
 
static void verifyCreateSubpartitionViewThrowsException(
-   ResultPartitionManager partitionManager,
+   ResultPartitionProvider partitionManager,
ResultPartitionID partitionId) throws IOException {
try {
partitionManager.createSubpartitionView(partitionId, 0, 
new NoOpBufferAvailablityListener());
@@ -114,7 +116,10 @@ public class PartitionTestUtils {
return 
createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
}
 
-   public static void writeBuffers(ResultPartition partition, int 
numberOfBuffers, int bufferSize) throws IOException {
+   public static void writeBuffers(
+   ResultPartitionWriter partition,
+   int numberOfBuffers,
+   int bufferSize) throws IOException {
for (int i = 0; i < numberOfBuffers; i++) {

partition.addBufferConsumer(createFilledBufferConsumer(bufferSize, bufferSize), 
0);
}



[flink] 09/13: [hotfix][tests] fix codestyle issues in ResultPartitionFactoryTest

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e593c5d7deb7b786fd67b1772d0438da2f6a70ad
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:43:41 2019 +0300

[hotfix][tests] fix codestyle issues in ResultPartitionFactoryTest
---
 .../runtime/io/network/partition/ResultPartitionFactoryTest.java| 6 --
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
index 8065829..1c8591f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactoryTest.java
@@ -39,9 +39,11 @@ import static org.hamcrest.MatcherAssert.assertThat;
 /**
  * Tests for the {@link ResultPartitionFactory}.
  */
+@SuppressWarnings("StaticVariableUsedBeforeInitialization")
 public class ResultPartitionFactoryTest extends TestLogger {
 
private static final String tempDir = 
EnvironmentInformation.getTemporaryFileDirectory();
+   private static final int SEGMENT_SIZE = 64;
 
private static FileChannelManager fileChannelManager;
 
@@ -79,11 +81,11 @@ public class ResultPartitionFactoryTest extends TestLogger {
ResultPartitionFactory factory = new ResultPartitionFactory(
new ResultPartitionManager(),
fileChannelManager,
-   new NetworkBufferPool(1, 64, 1),
+   new NetworkBufferPool(1, SEGMENT_SIZE, 1),
BoundedBlockingSubpartitionType.AUTO,
1,
1,
-   64,
+   SEGMENT_SIZE,
releasePartitionOnConsumption);
 
final ResultPartitionDeploymentDescriptor descriptor = new 
ResultPartitionDeploymentDescriptor(



[flink] 12/13: [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentConfiguration

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ec07689abee17bc27b15efd763775c5fb4a4101
Author: Andrey Zagrebin 
AuthorDate: Wed Jul 31 10:52:00 2019 +0300

[hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentConfiguration
---
 .../flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
index ba6bb77..6d4ca69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/NettyShuffleEnvironmentConfiguration.java
@@ -29,8 +29,8 @@ import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import 
org.apache.flink.runtime.io.network.partition.BoundedBlockingSubpartitionType;
 import org.apache.flink.runtime.util.ConfigurationParserUtils;
-
 import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 



[flink] 08/13: [hotfix][tests] fix codestyle issues in ResultPartitionBuilder

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e0617be01454b663de5e21ae73f0f1d61b504da0
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:39:52 2019 +0300

[hotfix][tests] fix codestyle issues in ResultPartitionBuilder
---
 .../flink/runtime/io/network/partition/ResultPartitionBuilder.java   | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
index 27eaab8..4bb1a85 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionBuilder.java
@@ -113,7 +113,7 @@ public class ResultPartitionBuilder {
return this;
}
 
-   public ResultPartitionBuilder setNetworkBufferSize(int 
networkBufferSize) {
+   ResultPartitionBuilder setNetworkBufferSize(int networkBufferSize) {
this.networkBufferSize = networkBufferSize;
return this;
}
@@ -129,7 +129,8 @@ public class ResultPartitionBuilder {
return this;
}
 
-   public ResultPartitionBuilder 
setBoundedBlockingSubpartitionType(BoundedBlockingSubpartitionType 
blockingSubpartitionType) {
+   ResultPartitionBuilder setBoundedBlockingSubpartitionType(
+   @SuppressWarnings("SameParameterValue") 
BoundedBlockingSubpartitionType blockingSubpartitionType) {
this.blockingSubpartitionType = blockingSubpartitionType;
return this;
}



[flink] 04/13: [hotfix][network] Annotate NettyShuffleDescriptor#PartitionConnectionInfo with @FunctionalInterface

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ee937c16088ffcc0d75de64d89dca12be52ec5a
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:29:34 2019 +0300

[hotfix][network] Annotate NettyShuffleDescriptor#PartitionConnectionInfo 
with @FunctionalInterface
---
 .../java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java| 1 +
 1 file changed, 1 insertion(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
index f758bcc..cf58b97 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/NettyShuffleDescriptor.java
@@ -70,6 +70,7 @@ public class NettyShuffleDescriptor implements 
ShuffleDescriptor {
/**
 * Information for connection to partition producer for shuffle 
exchange.
 */
+   @FunctionalInterface
public interface PartitionConnectionInfo extends Serializable {
ConnectionID getConnectionId();
}



[flink] 03/13: [hotfix][network] fix codestyle issues in ResultPartitionFactory

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit ac0e20499d62467e279a3ce6bc305a3f879fbab4
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:27:50 2019 +0300

[hotfix][network] fix codestyle issues in ResultPartitionFactory
---
 .../network/partition/ResultPartitionFactory.java  | 49 ++
 1 file changed, 21 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
index b390987..0656e6e 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
@@ -33,8 +33,6 @@ import org.apache.flink.util.function.FunctionWithException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import javax.annotation.Nonnull;
-
 import java.io.File;
 import java.io.IOException;
 import java.util.Optional;
@@ -46,13 +44,10 @@ public class ResultPartitionFactory {
 
private static final Logger LOG = 
LoggerFactory.getLogger(ResultPartitionFactory.class);
 
-   @Nonnull
private final ResultPartitionManager partitionManager;
 
-   @Nonnull
private final FileChannelManager channelManager;
 
-   @Nonnull
private final BufferPoolFactory bufferPoolFactory;
 
private final BoundedBlockingSubpartitionType blockingSubpartitionType;
@@ -66,9 +61,9 @@ public class ResultPartitionFactory {
private final boolean forcePartitionReleaseOnConsumption;
 
public ResultPartitionFactory(
-   @Nonnull ResultPartitionManager partitionManager,
-   @Nonnull FileChannelManager channelManager,
-   @Nonnull BufferPoolFactory bufferPoolFactory,
+   ResultPartitionManager partitionManager,
+   FileChannelManager channelManager,
+   BufferPoolFactory bufferPoolFactory,
BoundedBlockingSubpartitionType blockingSubpartitionType,
int networkBuffersPerChannel,
int floatingNetworkBuffersPerGate,
@@ -86,9 +81,8 @@ public class ResultPartitionFactory {
}
 
public ResultPartition create(
-   @Nonnull String taskNameWithSubtaskAndId,
-   @Nonnull ResultPartitionDeploymentDescriptor desc) {
-
+   String taskNameWithSubtaskAndId,
+   ResultPartitionDeploymentDescriptor desc) {
return create(
taskNameWithSubtaskAndId,
desc.getShuffleDescriptor().getResultPartitionID(),
@@ -100,13 +94,12 @@ public class ResultPartitionFactory {
 
@VisibleForTesting
public ResultPartition create(
-   @Nonnull String taskNameWithSubtaskAndId,
-   @Nonnull ResultPartitionID id,
-   @Nonnull ResultPartitionType type,
-   int numberOfSubpartitions,
-   int maxParallelism,
-   FunctionWithException 
bufferPoolFactory) {
-
+   String taskNameWithSubtaskAndId,
+   ResultPartitionID id,
+   ResultPartitionType type,
+   int numberOfSubpartitions,
+   int maxParallelism,
+   FunctionWithException bufferPoolFactory) {
ResultSubpartition[] subpartitions = new 
ResultSubpartition[numberOfSubpartitions];
 
ResultPartition partition = forcePartitionReleaseOnConsumption 
|| !type.isBlocking()
@@ -139,10 +132,10 @@ public class ResultPartitionFactory {
ResultPartitionType type,
BoundedBlockingSubpartitionType 
blockingSubpartitionType,
ResultSubpartition[] subpartitions) {
-
// Create the subpartitions.
switch (type) {
case BLOCKING:
+   case BLOCKING_PERSISTENT:

initializeBoundedBlockingPartitions(subpartitions, partition, 
blockingSubpartitionType, networkBufferSize, channelManager);
break;
 
@@ -160,15 +153,14 @@ public class ResultPartitionFactory {
}
 
private static void initializeBoundedBlockingPartitions(
-   ResultSubpartition[] subpartitions,
-   ResultPartition parent,
-   BoundedBlockingSubpartitionType blockingSubpartitionType,
-   int networkBufferSize,
-   FileChannelManager channelManager) {
-
+   ResultSubpartition[] subpartitions,
+   ResultPartition parent

[flink] 10/13: [hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentBuilder

2019-08-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4437249fa32f1957f6312eed9eba3f5878fa3bce
Author: Andrey Zagrebin 
AuthorDate: Mon Jul 29 17:48:28 2019 +0300

[hotfix][tests] fix codestyle issues in NettyShuffleEnvironmentBuilder
---
 .../io/network/NettyShuffleEnvironmentBuilder.java | 49 +-
 1 file changed, 11 insertions(+), 38 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
index 96b6330..4f0ecf3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/NettyShuffleEnvironmentBuilder.java
@@ -33,38 +33,30 @@ import java.time.Duration;
  */
 public class NettyShuffleEnvironmentBuilder {
 
-   public static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
+   private static final int DEFAULT_NETWORK_BUFFER_SIZE = 32 << 10;
+   private static final int DEFAULT_NUM_NETWORK_BUFFERS = 1024;
 
-   private static final String[] DEFAULT_TEMP_DIRS = new String[] 
{EnvironmentInformation.getTemporaryFileDirectory()};
+   private static final String[] DEFAULT_TEMP_DIRS = 
{EnvironmentInformation.getTemporaryFileDirectory()};
+   private static final Duration DEFAULT_REQUEST_SEGMENTS_TIMEOUT = 
Duration.ofMillis(3L);
 
private int numNetworkBuffers = DEFAULT_NUM_NETWORK_BUFFERS;
 
-   private int networkBufferSize = 32 * 1024;
+   private int partitionRequestInitialBackoff;
 
-   private int partitionRequestInitialBackoff = 0;
-
-   private int partitionRequestMaxBackoff = 0;
+   private int partitionRequestMaxBackoff;
 
private int networkBuffersPerChannel = 2;
 
private int floatingNetworkBuffersPerGate = 8;
 
-   private Duration requestSegmentsTimeout = Duration.ofMillis(3L);
-
private boolean isCreditBased = true;
 
-   private boolean isNetworkDetailedMetrics = false;
-
private ResourceID taskManagerLocation = ResourceID.generate();
 
private NettyConfig nettyConfig;
 
-   private TaskEventDispatcher taskEventDispatcher = new 
TaskEventDispatcher();
-
private MetricGroup metricGroup = 
UnregisteredMetricGroups.createUnregisteredTaskManagerMetricGroup();
 
-   private String[] tempDirs = DEFAULT_TEMP_DIRS;
-
public NettyShuffleEnvironmentBuilder setTaskManagerLocation(ResourceID 
taskManagerLocation) {
this.taskManagerLocation = taskManagerLocation;
return this;
@@ -75,11 +67,6 @@ public class NettyShuffleEnvironmentBuilder {
return this;
}
 
-   public NettyShuffleEnvironmentBuilder setNetworkBufferSize(int 
networkBufferSize) {
-   this.networkBufferSize = networkBufferSize;
-   return this;
-   }
-
public NettyShuffleEnvironmentBuilder 
setPartitionRequestInitialBackoff(int partitionRequestInitialBackoff) {
this.partitionRequestInitialBackoff = 
partitionRequestInitialBackoff;
return this;
@@ -100,10 +87,6 @@ public class NettyShuffleEnvironmentBuilder {
return this;
}
 
-   public void setRequestSegmentsTimeout(Duration requestSegmentsTimeout) {
-   this.requestSegmentsTimeout = requestSegmentsTimeout;
-   }
-
public NettyShuffleEnvironmentBuilder setIsCreditBased(boolean 
isCreditBased) {
this.isCreditBased = isCreditBased;
return this;
@@ -114,39 +97,29 @@ public class NettyShuffleEnvironmentBuilder {
return this;
}
 
-   public NettyShuffleEnvironmentBuilder 
setTaskEventDispatcher(TaskEventDispatcher taskEventDispatcher) {
-   this.taskEventDispatcher = taskEventDispatcher;
-   return this;
-   }
-
public NettyShuffleEnvironmentBuilder setMetricGroup(MetricGroup 
metricGroup) {
this.metricGroup = metricGroup;
return this;
}
 
-   public NettyShuffleEnvironmentBuilder setTempDirs(String[] tempDirs) {
-   this.tempDirs = tempDirs;
-   return this;
-   }
-
public NettyShuffleEnvironment build() {
return NettyShuffleServiceFactory.createNettyShuffleEnvironment(
new NettyShuffleEnvironmentConfiguration(
numNetworkBuffers,
-   networkBufferSize,
+   DEFAULT_NETWORK_BUFFER_SIZE,
partitionRequestInitialBackoff,
partitionRequestMaxB

[flink] branch master updated: [FLINK-12576][Network, Metrics] Take LocalInputChannel into account when compute inputQueueLength (#8559)

2019-08-05 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 302fc1d  [FLINK-12576][Network, Metrics] Take LocalInputChannel into 
account when compute inputQueueLength (#8559)
302fc1d is described below

commit 302fc1d5de38bb6db99ddc45470efcc9ebd782bc
Author: Aitozi <1059789...@qq.com>
AuthorDate: Mon Aug 5 19:25:29 2019 +0800

[FLINK-12576][Network, Metrics] Take LocalInputChannel into account when 
compute inputQueueLength (#8559)

* [FLINK-12576]Take localInputChannel into account when complute 
inputQueueLength

* remove default method

* fix comments

* fix up

* fix up

* u
---
 docs/monitoring/metrics.md |  2 +-
 docs/monitoring/metrics.zh.md  |  2 +-
 .../BoundedBlockingSubpartitionReader.java |  5 +++
 .../partition/NoOpResultSubpartitionView.java  |  5 +++
 .../partition/PipelinedSubpartitionView.java   |  5 +++
 .../network/partition/ResultSubpartitionView.java  |  2 +
 .../network/partition/consumer/InputChannel.java   |  8 
 .../partition/consumer/LocalInputChannel.java  | 11 ++
 .../partition/consumer/RemoteInputChannel.java |  1 +
 .../partition/consumer/SingleInputGate.java|  4 +-
 .../network/netty/CancelPartitionRequestTest.java  |  5 +++
 .../partition/consumer/LocalInputChannelTest.java  |  1 +
 .../partition/consumer/SingleInputGateTest.java| 44 ++
 13 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 211ccfa..9b5c504 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
   Gauge
 
 
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 44b4806..d262c44 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
   Gauge
 
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 63e5e22..2da9534 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -167,6 +167,11 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
}
 
@Override
+   public int unsynchronizedGetNumberOfQueuedBuffers() {
+   return parent.unsynchronizedGetNumberOfQueuedBuffers();
+   }
+
+   @Override
public String toString() {
return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
parent.parent.getPartitionId(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index f3ba1e3..b961ab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -61,4 +61,9 @@ public class NoOpResultSubpartitionView implements 
ResultSubpartitionView {
public boolean isAvailable() {
return false;
}
+
+   @Override
+   public int unsynchronizedGetNumberOfQueuedBuffers() {
+   return 0;
+   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 9d08358..94ada2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
@@ -91,6 +91,11 @@ class PipelinedSubpartitionView implements 
Result

[flink] branch release-1.9 updated: [FLINK-12576][Network, Metrics] Take LocalInputChannel into account when computing inputQueueLength

2019-08-05 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
 new 2a6fb9a  [FLINK-12576][Network,Metrics] Take LocalInputChannel into 
account when computing inputQueueLength
2a6fb9a is described below

commit 2a6fb9af7bca5a55d1dc9c55b779eea38b43e1a2
Author: Aitozi <1059789...@qq.com>
AuthorDate: Mon Aug 5 19:25:29 2019 +0800

[FLINK-12576][Network,Metrics] Take LocalInputChannel into account when 
computing inputQueueLength

Currently inputQueueLength ignores 
LocalInputChannels(SingleInputGate#getNumberOfQueuedBuffers). This can cause 
mistakes when
looking for causes of back pressure(If task is back pressuring whole Flink 
job, but there is a data skew and only local input
channels are being used).
---
 docs/monitoring/metrics.md |  2 +-
 docs/monitoring/metrics.zh.md  |  2 +-
 .../BoundedBlockingSubpartitionReader.java |  5 +++
 .../partition/NoOpResultSubpartitionView.java  |  5 +++
 .../partition/PipelinedSubpartitionView.java   |  5 +++
 .../network/partition/ResultSubpartitionView.java  |  2 +
 .../network/partition/consumer/InputChannel.java   |  8 
 .../partition/consumer/LocalInputChannel.java  | 11 ++
 .../partition/consumer/RemoteInputChannel.java |  1 +
 .../partition/consumer/SingleInputGate.java|  4 +-
 .../network/netty/CancelPartitionRequestTest.java  |  5 +++
 .../partition/consumer/LocalInputChannelTest.java  |  1 +
 .../partition/consumer/SingleInputGateTest.java| 44 ++
 13 files changed, 90 insertions(+), 5 deletions(-)

diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md
index 211ccfa..9b5c504 100644
--- a/docs/monitoring/metrics.md
+++ b/docs/monitoring/metrics.md
@@ -1031,7 +1031,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
   Gauge
 
 
diff --git a/docs/monitoring/metrics.zh.md b/docs/monitoring/metrics.zh.md
index 44b4806..d262c44 100644
--- a/docs/monitoring/metrics.zh.md
+++ b/docs/monitoring/metrics.zh.md
@@ -1030,7 +1030,7 @@ Thus, in order to infer the metric identifier:
   Task
   buffers
   inputQueueLength
-  The number of queued input buffers.
+  The number of queued input buffers. (ignores LocalInputChannels 
which are using blocking subpartitions)
   Gauge
 
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
index 63e5e22..2da9534 100755
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BoundedBlockingSubpartitionReader.java
@@ -167,6 +167,11 @@ final class BoundedBlockingSubpartitionReader implements 
ResultSubpartitionView
}
 
@Override
+   public int unsynchronizedGetNumberOfQueuedBuffers() {
+   return parent.unsynchronizedGetNumberOfQueuedBuffers();
+   }
+
+   @Override
public String toString() {
return String.format("Blocking Subpartition Reader: ID=%s, 
index=%d",
parent.parent.getPartitionId(),
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
index f3ba1e3..b961ab6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/NoOpResultSubpartitionView.java
@@ -61,4 +61,9 @@ public class NoOpResultSubpartitionView implements 
ResultSubpartitionView {
public boolean isAvailable() {
return false;
}
+
+   @Override
+   public int unsynchronizedGetNumberOfQueuedBuffers() {
+   return 0;
+   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
index 9d08358..94ada2d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpa

[flink] branch master updated (aa9ac7e -> d3f6f8e)

2019-08-23 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from aa9ac7e  [FLINK-13430][build] Configure sending travis build 
notifications to bui...@flink.apache.org
 add a7f1fee  [FLINK-13764][task,metrics] Pass the counter of numRecordsIn 
into the constructor of StreamInputProcessor
 add d3f6f8e  [hotfix][task] Adjust the code format of OneInputStreamTask

No new revisions were added by this update.

Summary of changes:
 .../runtime/io/StreamOneInputProcessor.java| 21 -
 .../io/StreamTwoInputSelectableProcessor.java  | 16 -
 .../runtime/tasks/OneInputStreamTask.java  | 27 +++---
 .../flink/streaming/runtime/tasks/StreamTask.java  | 12 ++
 .../tasks/TwoInputSelectableStreamTask.java|  3 ++-
 5 files changed, 36 insertions(+), 43 deletions(-)



[flink] branch master updated (d3f6f8e -> 482462f)

2019-08-23 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d3f6f8e  [hotfix][task] Adjust the code format of OneInputStreamTask
 add 482462f  [FLINK-13442][network] Remove unnecessary 
notifySubpartitionConsumed method from view reader

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/runtime/io/network/NetworkSequenceViewReader.java | 2 --
 .../io/network/netty/CreditBasedSequenceNumberingViewReader.java   | 5 -
 .../flink/runtime/io/network/netty/PartitionRequestQueue.java  | 1 -
 .../runtime/io/network/netty/SequenceNumberingViewReader.java  | 5 -
 .../runtime/io/network/partition/BoundedBlockingSubpartition.java  | 2 ++
 .../io/network/partition/BoundedBlockingSubpartitionReader.java| 5 -
 .../runtime/io/network/partition/NoOpResultSubpartitionView.java   | 4 
 .../runtime/io/network/partition/PipelinedSubpartitionView.java| 5 -
 .../flink/runtime/io/network/partition/ResultSubpartitionView.java | 2 --
 .../flink/runtime/io/network/partition/consumer/InputChannel.java  | 2 --
 .../runtime/io/network/partition/consumer/LocalInputChannel.java   | 7 ---
 .../runtime/io/network/partition/consumer/RemoteInputChannel.java  | 5 -
 .../runtime/io/network/partition/consumer/SingleInputGate.java | 1 -
 .../runtime/io/network/partition/consumer/UnknownInputChannel.java | 4 
 .../flink/runtime/io/network/netty/CancelPartitionRequestTest.java | 6 --
 .../flink/runtime/io/network/partition/ResultPartitionTest.java| 2 +-
 .../runtime/io/network/partition/consumer/InputChannelTest.java| 4 
 .../runtime/io/network/partition/consumer/TestInputChannel.java| 5 -
 .../flink/runtime/io/network/util/TestSubpartitionConsumer.java| 2 +-
 19 files changed, 4 insertions(+), 65 deletions(-)



[flink] branch master updated (a4607c8 -> 261c2dc)

2019-08-27 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a4607c8  [FLINK-13853][e2e] Update common_ha.sh test expression to 
count recoveries
 add 261c2dc  [FLINK-13765][task] Introduce the TwoInputSelectionHandler 
for selecting input in StreamTwoInputSelectableProcessor

No new revisions were added by this update.

Summary of changes:
 .../io/StreamTwoInputSelectableProcessor.java  | 55 ++--
 .../runtime/io/TwoInputSelectionHandler.java   | 75 ++
 .../tasks/TwoInputSelectableStreamTask.java|  8 +++
 3 files changed, 103 insertions(+), 35 deletions(-)
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/TwoInputSelectionHandler.java



[flink] 01/02: [hotfix][runtime] SourceStreamTask: set legacy source thread name to improve debugging

2019-09-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1638fb3812047736524cd8e25bc4d5a6fa2596a7
Author: Aleksey Pak 
AuthorDate: Fri Sep 6 10:00:31 2019 +0200

[hotfix][runtime] SourceStreamTask: set legacy source thread name to 
improve debugging
---
 .../org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java   | 5 +
 1 file changed, 5 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index e06e2b4..5caddef 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -114,6 +114,7 @@ public class SourceStreamTask, OP extends S
protected void performDefaultAction(ActionContext context) throws 
Exception {
// Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
// compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
+   sourceThread.setTaskDescription(getName());
sourceThread.start();
 
// We run an alternative mailbox loop that does not involve 
default actions and synchronizes around actions.
@@ -207,6 +208,10 @@ public class SourceStreamTask, OP extends S
}
}
 
+   public void setTaskDescription(final String taskDescription) {
+   setName("Legacy Source Thread - " + taskDescription);
+   }
+
void checkThrowSourceExecutionException() throws Exception {
if (sourceExecutionThrowable != null) {
throw new Exception(sourceExecutionThrowable);



[flink] branch release-1.9 updated (fcdad72 -> b5b705f)

2019-09-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git.


from fcdad72  [FLINK-13942][docs] Add "Getting Started" overview page.
 new 1638fb3  [hotfix][runtime] SourceStreamTask: set legacy source thread 
name to improve debugging
 new b5b705f  [hotfix][runtime] Rename StreamTask's performDefaultAction 
method to processInput

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/state/api/output/BoundedStreamTask.java   |  2 +-
 .../flink/streaming/runtime/tasks/SourceStreamTask.java|  7 ++-
 .../flink/streaming/runtime/tasks/StreamIterationHead.java |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  4 ++--
 .../runtime/tasks/StreamTaskSelectiveReadingTest.java  |  4 ++--
 .../streaming/runtime/tasks/StreamTaskTerminationTest.java |  2 +-
 .../flink/streaming/runtime/tasks/StreamTaskTest.java  | 14 +++---
 .../runtime/tasks/SynchronousCheckpointITCase.java |  2 +-
 .../streaming/runtime/tasks/SynchronousCheckpointTest.java |  4 ++--
 .../runtime/tasks/TaskCheckpointingBehaviourTest.java  |  2 +-
 .../org/apache/flink/streaming/util/MockStreamTask.java|  2 +-
 .../runtime/jobmaster/JobMasterStopWithSavepointIT.java|  4 ++--
 12 files changed, 27 insertions(+), 22 deletions(-)



[flink] 02/02: [hotfix][runtime] Rename StreamTask's performDefaultAction method to processInput

2019-09-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b5b705f3b8dcafb915a3d59190a03867a27c9113
Author: Aleksey Pak 
AuthorDate: Fri Sep 6 10:03:26 2019 +0200

[hotfix][runtime] Rename StreamTask's performDefaultAction method to 
processInput
---
 .../apache/flink/state/api/output/BoundedStreamTask.java   |  2 +-
 .../flink/streaming/runtime/tasks/SourceStreamTask.java|  2 +-
 .../flink/streaming/runtime/tasks/StreamIterationHead.java |  2 +-
 .../apache/flink/streaming/runtime/tasks/StreamTask.java   |  4 ++--
 .../runtime/tasks/StreamTaskSelectiveReadingTest.java  |  4 ++--
 .../streaming/runtime/tasks/StreamTaskTerminationTest.java |  2 +-
 .../flink/streaming/runtime/tasks/StreamTaskTest.java  | 14 +++---
 .../runtime/tasks/SynchronousCheckpointITCase.java |  2 +-
 .../streaming/runtime/tasks/SynchronousCheckpointTest.java |  4 ++--
 .../runtime/tasks/TaskCheckpointingBehaviourTest.java  |  2 +-
 .../org/apache/flink/streaming/util/MockStreamTask.java|  2 +-
 .../runtime/jobmaster/JobMasterStopWithSavepointIT.java|  4 ++--
 12 files changed, 22 insertions(+), 22 deletions(-)

diff --git 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
index db663da..814f445 100644
--- 
a/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
+++ 
b/flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
@@ -76,7 +76,7 @@ class BoundedStreamTask & Bo
}
 
@Override
-   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   protected void processInput(ActionContext context) throws Exception {
if (input.hasNext()) {
reuse.replace(input.next());
headOperator.setKeyContextElement1(reuse);
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
index 5caddef..e1f7990 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTask.java
@@ -111,7 +111,7 @@ public class SourceStreamTask, OP extends S
}
 
@Override
-   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   protected void processInput(ActionContext context) throws Exception {
// Against the usual contract of this method, this 
implementation is not step-wise but blocking instead for
// compatibility reasons with the current source interface 
(source functions run as a loop, not in steps).
sourceThread.setTaskDescription(getName());
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
index d25bd23..5d71adb 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamIterationHead.java
@@ -66,7 +66,7 @@ public class StreamIterationHead extends 
OneInputStreamTask {
// 

 
@Override
-   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   protected void processInput(ActionContext context) throws Exception {
StreamRecord nextRecord = shouldWait ?
dataChannel.poll(iterationWaitTime, 
TimeUnit.MILLISECONDS) :
dataChannel.take();
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 1b1cfc4..aa84ae9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -275,7 +275,7 @@ public abstract class StreamTask>
 * @param context context object for collaborative interaction between 
the action and the stream task.
 * @throws Exception on any problems in the action.
 */
-   protected void performDefaultAction(ActionContext context) throws 
Exception {
+   pr

[flink] branch master updated (00ee803 -> cee9bf0)

2019-10-12 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 00ee803  [FLINK-14273][table-planner-blink] Add 
UserDefinedFunctionValidationTest to verify operand type check
 add cee9bf0  [FLINK-14004][runtime] Define SourceReaderOperator to verify 
the integration with StreamOneInputProcessor

No new revisions were added by this update.

Summary of changes:
 ...ratorFactory.java => SourceReaderOperator.java} |  16 ++--
 .../streaming/runtime/io/AbstractDataOutput.java   |  51 +++
 .../runtime/io/StreamTaskSourceInput.java  |  44 -
 .../runtime/io/StreamTwoInputProcessor.java|  10 +--
 .../runtime/tasks/OneInputStreamTask.java  |  19 +---
 .../runtime/tasks/SourceReaderStreamTask.java  | 100 +
 .../runtime/tasks/SourceReaderStreamTaskTest.java  |  92 +++
 7 files changed, 279 insertions(+), 53 deletions(-)
 copy 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/{InputFormatOperatorFactory.java
 => SourceReaderOperator.java} (64%)
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AbstractDataOutput.java
 copy 
flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/utils/OffsetAwareOutputStream.java
 => 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTaskSourceInput.java
 (50%)
 create mode 100644 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceReaderStreamTask.java
 create mode 100644 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceReaderStreamTaskTest.java



[flink] branch master updated (d571b2b -> 73ec89b)

2019-10-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d571b2b  [FLINK-13982][runtime] Generate dynamic configurations and 
JVM parameters with TaskExecutorResourceUtils.
 add 4966b1f  [FLINK-12576][docs,metrics] Document that input pool usage 
metrics ignore LocalInputChannels
 add 73ec89b  [hotfix][docs,metrics] Fix typo in the input pool usage 
metrics

No new revisions were added by this update.

Summary of changes:
 docs/monitoring/metrics.md| 6 +++---
 docs/monitoring/metrics.zh.md | 6 +++---
 2 files changed, 6 insertions(+), 6 deletions(-)



[flink] branch master updated (2e9f16e -> 47e9a82)

2019-10-20 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2e9f16e  [hotfix][doc] fix typo in RuntimeContext
 add ccfb82e  [hotfix][task] Fix the code formatting in StreamTask
 add 09c2c98  [hotfix][runtime] Remove legacy NullableAsyncDataInput class
 add 34e53d1  [hotfix][network] Remove optional class field from 
LocalBufferPool
 add 47e9a82  [FLINK-14394][network] Remove unnecessary interface method 
BufferProvider#requestBufferBlocking

No new revisions were added by this update.

Summary of changes:
 .../flink/runtime/io/NullableAsyncDataInput.java   | 39 --
 .../io/network/buffer/BufferPoolFactory.java   |  9 +++--
 .../runtime/io/network/buffer/BufferProvider.java  |  8 -
 .../runtime/io/network/buffer/LocalBufferPool.java | 33 --
 .../io/network/buffer/NetworkBufferPool.java   | 15 ++---
 .../network/partition/ResultPartitionFactory.java  |  5 ++-
 .../network/buffer/LocalBufferPoolDestroyTest.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java |  8 +++--
 .../io/network/buffer/NetworkBufferPoolTest.java   |  9 +++--
 .../runtime/io/network/buffer/NoOpBufferPool.java  |  5 ---
 .../network/netty/CancelPartitionRequestTest.java  | 12 ---
 .../partition/consumer/RemoteInputChannelTest.java |  6 ++--
 .../io/network/util/TestPooledBufferProvider.java  |  3 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 14 files changed, 55 insertions(+), 101 deletions(-)
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/io/NullableAsyncDataInput.java



[flink] branch master updated (2e9f16e -> 47e9a82)

2019-10-20 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2e9f16e  [hotfix][doc] fix typo in RuntimeContext
 add ccfb82e  [hotfix][task] Fix the code formatting in StreamTask
 add 09c2c98  [hotfix][runtime] Remove legacy NullableAsyncDataInput class
 add 34e53d1  [hotfix][network] Remove optional class field from 
LocalBufferPool
 add 47e9a82  [FLINK-14394][network] Remove unnecessary interface method 
BufferProvider#requestBufferBlocking

No new revisions were added by this update.

Summary of changes:
 .../flink/runtime/io/NullableAsyncDataInput.java   | 39 --
 .../io/network/buffer/BufferPoolFactory.java   |  9 +++--
 .../runtime/io/network/buffer/BufferProvider.java  |  8 -
 .../runtime/io/network/buffer/LocalBufferPool.java | 33 --
 .../io/network/buffer/NetworkBufferPool.java   | 15 ++---
 .../network/partition/ResultPartitionFactory.java  |  5 ++-
 .../network/buffer/LocalBufferPoolDestroyTest.java |  2 +-
 .../io/network/buffer/LocalBufferPoolTest.java |  8 +++--
 .../io/network/buffer/NetworkBufferPoolTest.java   |  9 +++--
 .../runtime/io/network/buffer/NoOpBufferPool.java  |  5 ---
 .../network/netty/CancelPartitionRequestTest.java  | 12 ---
 .../partition/consumer/RemoteInputChannelTest.java |  6 ++--
 .../io/network/util/TestPooledBufferProvider.java  |  3 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 14 files changed, 55 insertions(+), 101 deletions(-)
 delete mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/io/NullableAsyncDataInput.java



[flink] branch release-1.11 updated: [FLINK-17823][network] Resolve the race condition while releasing RemoteInputChannel

2020-05-21 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 3eb1075  [FLINK-17823][network] Resolve the race condition while 
releasing RemoteInputChannel
3eb1075 is described below

commit 3eb1075ded64da20e6f7a5bc268f455eaf6573eb
Author: Zhijiang 
AuthorDate: Wed May 20 12:16:56 2020 +0800

[FLINK-17823][network] Resolve the race condition while releasing 
RemoteInputChannel

RemoteInputChannel#releaseAllResources might be called by canceler thread. 
Meanwhile, the task thread can also call RemoteInputChannel#getNextBuffer.
There probably cause two potential problems:

1. Task thread might get null buffer after canceler thread already released 
all the buffers, then it might cause misleading NPE in getNextBuffer.
2. Task thread and canceler thread might pull the same buffer concurrently, 
which causes unexpected exception when the same buffer is recycled twice.

The solution is to properly synchronize the buffer queue in release method 
to avoid the same buffer pulled by both canceler thread and task thread.
And in getNextBuffer method, we add some explicit checks to avoid 
misleading NPE and hint some valid exceptions.
---
 .../partition/consumer/RemoteInputChannel.java | 15 +--
 .../partition/consumer/RemoteInputChannelTest.java | 51 ++
 2 files changed, 63 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 4e1f260..ba8fc11 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.event.TaskEvent;
+import org.apache.flink.runtime.execution.CancelTaskException;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
@@ -168,7 +169,6 @@ public class RemoteInputChannel extends InputChannel {
 
@Override
Optional getNextBuffer() throws IOException {
-   checkState(!isReleased.get(), "Queried for a buffer after 
channel has been closed.");
checkState(partitionRequestClient != null, "Queried for a 
buffer before requesting a queue.");
 
checkError();
@@ -181,6 +181,14 @@ public class RemoteInputChannel extends InputChannel {
moreAvailable = !receivedBuffers.isEmpty();
}
 
+   if (next == null) {
+   if (isReleased.get()) {
+   throw new CancelTaskException("Queried for a 
buffer after channel has been released.");
+   } else {
+   throw new IllegalStateException("There should 
always have queued buffers for unreleased channel.");
+   }
+   }
+
numBytesIn.inc(next.getSize());
numBuffersIn.inc();
return Optional.of(new BufferAndAvailability(next, 
moreAvailable, 0));
@@ -242,9 +250,10 @@ public class RemoteInputChannel extends InputChannel {
void releaseAllResources() throws IOException {
if (isReleased.compareAndSet(false, true)) {
 
-   ArrayDeque releasedBuffers;
+   final ArrayDeque releasedBuffers;
synchronized (receivedBuffers) {
-   releasedBuffers = receivedBuffers;
+   releasedBuffers = new 
ArrayDeque<>(receivedBuffers);
+   receivedBuffers.clear();
}
bufferManager.releaseAllBuffers(releasedBuffers);
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index f059df7..b280422 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -58,6 +58,7 @@ import java.io.IOException;
 import jav

svn commit: r39727 - /release/flink/KEYS

2020-05-22 Thread zhijiang
Author: zhijiang
Date: Fri May 22 10:37:36 2020
New Revision: 39727

Log:
update

Modified:
release/flink/KEYS

Modified: release/flink/KEYS
==
--- release/flink/KEYS (original)
+++ release/flink/KEYS Fri May 22 10:37:36 2020
@@ -1884,3 +1884,62 @@ kVyZNDCORta08w==
 =2FP5
 -END PGP PUBLIC KEY BLOCK-
 
+pub   rsa4096 2020-05-22 [SC]
+  2DA85B93244FDFA19A6244500653C0A2CEA00D0E
+uid   [ultimate] Zhijiang Wang (CODE SIGNING KEY) 
+sig 30653C0A2CEA00D0E 2020-05-22  Zhijiang Wang (CODE SIGNING KEY) 

+sub   rsa4096 2020-05-22 [E]
+sig  0653C0A2CEA00D0E 2020-05-22  Zhijiang Wang (CODE SIGNING KEY) 

+
+-BEGIN PGP PUBLIC KEY BLOCK-
+
+mQINBF7HkrsBEADkG7StvrT9DaR1Hw6ZIWwdqVLW8Po0OVWgVWBiEgl8aAZ9LSL5
+KoLSg+nYNfvUwUMB+bJqolZaArMH0Z32lJ17TFRsvQs+RCtGgT3MSch0SdLtTFiV
+Ld1OAssrpIGcyVSEj0k0pb1dK8gdcGnkG9q+dg6e5t2969CnwVZPe+1OXCW3Wt7F
+sgQCScNzT7E638nEG9YvrkisdVgfHMO+r4zPNeaffn+Za9NyMKXerouZGsq/wYiv
+1MOfdBwPu0CtUD4toQ7EbZTK0IDz3C9fId85gBysB2awUi+xeJCBS5Vf9UNOJDj7
+aR7M34Sw1gt/JDZ8uOHbtbVYUEnBVM1tNU0m4hL4j79U0913QirvxOYc4yIey/E9
+QTnNRGo4pqn3vHvU5MLAsrYMBUhWWrH7VVwbJSAAgZcNWSJZQ028cJFTK91AfVGo
+LL9DxMtQsrx4O4pDJm9sl9pRoVfxrbbvmPX+gtTTkMH/lNTIXeUDpRukt48TUUrL
+8jGcC5wcn84pVpmP9E6pLTV3wxqq0QXaIhvLzSjWSiNpJWUpKm1y/BoA+LuNxjDz
+nFWFvmQ7lNFi88leU5qXgiUK7v8hqIjX0hjWETMDSIomLzeYeDvBLl8SlKKKuM+u
+KSZ/pFcDVdgcWDL6kcgk6DXDGACSVWvf9xMphhpYDxWQxO6LtjNVcLgeMwARAQAB
+tDZaaGlqaWFuZyBXYW5nIChDT0RFIFNJR05JTkcgS0VZKSA8emhpamlhbmdAYXBh
+Y2hlLm9yZz6JAk4EEwEIADgWIQQtqFuTJE/foZpiRFAGU8CizqANDgUCXseSuwIb
+AwULCQgHAgYVCgkICwIEFgIDAQIeAQIXgAAKCRAGU8CizqANDoIwEACU64W5F+2h
+ZlnB4KyWov0aqqoifT+2aWJa611KsLYQipTf9VfKO6gAlVDFE8XfSv7TkahbPAdM
+vjOWkCrzxSgbQYJmQq8/XEwVSn1oRKYYvA8dtjiIofEbEX6vIAvvdEQ9NgmdCHl9
+R7Ypqj+pqeOXBvA0yMO4+QAWIRfu68dw3YLIgSUd+sKV0W5trMYj/Be476MRaoEk
+zkV8rpiUPVhB4/roIIE8J8xFDpggvN98mySFNC0UDiBBFc8ezxLsGXAmNxK0e6B9
+bJG5nDExErUBhd45OwUDpXo5T9EOtcAPOZ0my+nA4hBPY00NkKjhLJgVcsdj0Hhh
+trNZt7dqgKk4BC83i46kAmUgal3+gsiHiLmxFf1LKzdJ+jOmu1LQkyZyR0RBtBwf
+swzmouQOPu8NLnxAlyU8jNOoWdXGKUbolr6w+bylgT84QffVZN4orA92uzENKUqO
+lul7B71ns0QZgiCg+wjW2YmxXZ+HH34HsdFEY/K12CUaC4HW90s7/Da/P2/wraJd
+F5Fa6vDqiEI4zMne+gry89yEYk1g0YQprmNArhjuZr+q6+Z65+8DcRILUuE0J5XD
+XzbQg4o2Ai83rsLLK19P/5m/a9Bp8bcuDxvEO9LdunnTyPQXa+aiDDXeiSmeRzaz
+RT4K/KljReRGLs6eZe7k0KCrhQGbP3RGELkCDQRex5K7ARAAnEWM0kYVIndrB6cT
++Mal/uUUVPSyVTm48c0ebN7uXhUT7LaFAhQFMsk5w7+QFGyO7e8Qsd3nITN+Vs6B
+fhqzDHiiq2McILrtelSvi3OHljRWNrWgwj1D3cKrkkA/0XRyg5GGIGG0NIcqZJHh
+qf8oVQVEB638hFSM07oHQt5L6OK/Wf9Qnz6ihWmEa6LuPkohAsp6zHyXQ0OgQ78O
+PCYmN0NnJFtEVNpBcpChpWEy8sufnAl/V6MNi9ahk69+1UIl5/sFlvBtsqD/seVb
+4HMNZG4RdMNuQ6x6wy68RdqzOudJGq7nTMcQs0hEosviljyRmfsnCH3cgnIGRAEo
+GJFH/Dn7dDVUuhWjU6DU7dPM4wWErBb7PQt3QGDKinOoTjf71G/a54f6DJ650uuu
+AJhUFF+rAEJooGxKX7dU2KggkFOUdcnS2MB1+Qouig0eXsrz/zUdtp1K/y7InUxH
+nrNXjww9sT78Ct53GUA99T2VN1/93RgXBXOgdNjmpaD5FMvkEa3/2NtFm9wc3F3S
+r56PAEodeT7WKODjoYEKJnrwNpq2OqT6BXTp8I8hQHXsWQno0RDTnFLvmmGXfVia
+650EreyqUEuZfpU6X2pnKj7XxDkWg8Adzk5aXDZ5zOiTep3/Tu6P0t6LQOW4V/nM
+o/JFI1gMAyygG3WKFEe5fV8ECVUAEQEAAYkCNgQYAQgAIBYhBC2oW5MkT9+hmmJE
+UAZTwKLOoA0OBQJex5K7AhsMAAoJEAZTwKLOoA0O61AP/0zEK3ukV4ubAjkt1zdB
+h26+ajzqEqK25fgO6rw2Sw3WYnnNdHFshh3UumNn5knO/9CqIW37HaNSpm1N+FBS
+5a6FYC8EcBzsuklT+lG+slfHsHyaWTkG81/z2w9iUR3dTBQ8zQPwwfCLaw9fp9r9
+tCYC8P/NiPLminIu7RChaYlxe6G455y4CTCRKJhhS6Ce42upS2K6h2lQ6/JmNpYu
+ORHL+46INeJnoSfT0w6U2z5rYsHpzdgPQesS+MuK6srhkTN8ojPgfN8mpmhf5GSk
+wMlZdYzUGGBajqai/LE/E3C/vDn4azW+jJ/XhWuBJU3pFAjl6tDgEQn5d3QQJX3G
+Min8LJAAhVP+ACaTfvwnBXCjF3ARfJC0MtM4CyBXgptbfMtueHqky9uNHxT4hupZ
+V4tw1bUUHwc0K+srVDvdL9wEuy3vtSCNG8Pd6i3UWSgyzoaShybwZIJ8A1qvCg0J
+s8UFzzU0+tlLJUn/5nYOEYEll1chArWV36JyqEjdfH0eLtH2hw0X9L5UIKSt6v7K
+OIPiQk/ZM1W6XKaWshrN368fwZW+BAVzvX3RuFTUMWkPROZSrm5sF306+o698yxP
+3DtoifJJQF66hV8rtEKJRWL9TSM3I5B6azQhr8e4U9KqeE8SZbdeARJKEx90Y3RW
+gZ3UdnRfVyrWEi+yndUAFrvT
+=iipH
+-END PGP PUBLIC KEY BLOCK-




[flink] branch release-1.11 updated: [hotfix][formatting] Fix the checkstyle issue of missing a javadoc comment in DummyNoOpOperator

2020-05-23 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new c408bb1  [hotfix][formatting] Fix the checkstyle issue of missing a 
javadoc comment in DummyNoOpOperator
c408bb1 is described below

commit c408bb1c96147eec2bcc71adc0bd482a9da52aa6
Author: Zhijiang 
AuthorDate: Sat May 23 19:07:37 2020 +0200

[hotfix][formatting] Fix the checkstyle issue of missing a javadoc comment 
in DummyNoOpOperator
---
 .../src/main/scala/org/apache/flink/table/util/DummyNoOpOperator.java  | 3 +++
 1 file changed, 3 insertions(+)

diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/DummyNoOpOperator.java
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/DummyNoOpOperator.java
index a14d632..dd3fa0a 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/DummyNoOpOperator.java
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/util/DummyNoOpOperator.java
@@ -40,6 +40,9 @@ public class DummyNoOpOperator extends NoOpOperator {
setInput(input);
}
 
+   /**
+* Dummy file input format implementation.
+*/
public static class DummyInputFormat extends FileInputFormat {
 
@Override



[flink] branch master updated: [FLINK-17820][checkpointing] Respect fileSizeThreshold in FsCheckpointStateOutputStream.flush()

2020-05-26 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 8b4fe87  [FLINK-17820][checkpointing] Respect fileSizeThreshold in 
FsCheckpointStateOutputStream.flush()
8b4fe87 is described below

commit 8b4fe87a74d3ec631350ebac4dfdf69094c802e3
Author: Roman 
AuthorDate: Wed May 27 08:39:43 2020 +0200

[FLINK-17820][checkpointing] Respect fileSizeThreshold in 
FsCheckpointStateOutputStream.flush()

This closes #12332
---
 .../filesystem/FsCheckpointStreamFactory.java  | 34 +--
 .../channel/ChannelStateCheckpointWriterTest.java  | 49 +++---
 .../state/filesystem/FsCheckpointStorageTest.java  |  8 ++--
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 31 +++---
 .../filesystem/FsStateBackendEntropyTest.java  |  7 ++--
 5 files changed, 90 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 4a4db0e..9234c9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -208,8 +208,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
@Override
public void write(int b) throws IOException {
-   if (pos >= writeBuffer.length) {
-   flush();
+   if (outStream != null || pos >= writeBuffer.length) {
+   flushToFile();
}
writeBuffer[pos++] = (byte) b;
}
@@ -226,8 +226,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
len -= remaining;
pos += remaining;
 
-   // flush the write buffer to make it 
clear again
-   flush();
+   // flushToFile the write buffer to make 
it clear again
+   flushToFile();
}
 
// copy what is in the buffer
@@ -235,8 +235,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
pos += len;
}
else {
-   // flush the current buffer
-   flush();
+   // flushToFile the current buffer
+   flushToFile();
// write the bytes directly
outStream.write(b, off, len);
}
@@ -247,15 +247,13 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
return pos + (outStream == null ? 0 : 
outStream.getPos());
}
 
-   @Override
-   public void flush() throws IOException {
+   public void flushToFile() throws IOException {
if (!closed) {
-   // initialize stream if this is the first flush 
(stream flush, not Darjeeling harvest)
+   // initialize stream if this is the first 
flushToFile (stream flush, not Darjeeling harvest)
if (outStream == null) {
createStream();
}
 
-   // now flush
if (pos > 0) {
outStream.write(writeBuffer, 0, pos);
pos = 0;
@@ -266,6 +264,16 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
}
}
 
+   /**
+* Flush buffers to file if their size is above {@link 
#localStateThreshold}.
+*/
+   @Override
+   public void flush() throws IOException {
+   if (pos > localStateThreshold) {
+   flushToFile();
+   }
+   }
+
@Override
public void sync() throws IOException {
outStream.sync();
@@ -289,7 +297,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
if (!closed) {
c

[flink] branch release-1.11 updated: [FLINK-17820][checkpointing] Respect fileSizeThreshold in FsCheckpointStateOutputStream.flush() (#12351)

2020-05-27 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 05b9792  [FLINK-17820][checkpointing] Respect fileSizeThreshold in 
FsCheckpointStateOutputStream.flush() (#12351)
05b9792 is described below

commit 05b97924572594ef244236a2f328177a2ec84fc4
Author: Roman 
AuthorDate: Wed May 27 17:33:17 2020 +0200

[FLINK-17820][checkpointing] Respect fileSizeThreshold in 
FsCheckpointStateOutputStream.flush() (#12351)

This closes #12332
---
 .../filesystem/FsCheckpointStreamFactory.java  | 34 +--
 .../channel/ChannelStateCheckpointWriterTest.java  | 49 +++---
 .../state/filesystem/FsCheckpointStorageTest.java  |  8 ++--
 .../filesystem/FsCheckpointStreamFactoryTest.java  | 31 +++---
 .../filesystem/FsStateBackendEntropyTest.java  |  7 ++--
 5 files changed, 90 insertions(+), 39 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 4a4db0e..9234c9f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -208,8 +208,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
 
@Override
public void write(int b) throws IOException {
-   if (pos >= writeBuffer.length) {
-   flush();
+   if (outStream != null || pos >= writeBuffer.length) {
+   flushToFile();
}
writeBuffer[pos++] = (byte) b;
}
@@ -226,8 +226,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
len -= remaining;
pos += remaining;
 
-   // flush the write buffer to make it 
clear again
-   flush();
+   // flushToFile the write buffer to make 
it clear again
+   flushToFile();
}
 
// copy what is in the buffer
@@ -235,8 +235,8 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
pos += len;
}
else {
-   // flush the current buffer
-   flush();
+   // flushToFile the current buffer
+   flushToFile();
// write the bytes directly
outStream.write(b, off, len);
}
@@ -247,15 +247,13 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
return pos + (outStream == null ? 0 : 
outStream.getPos());
}
 
-   @Override
-   public void flush() throws IOException {
+   public void flushToFile() throws IOException {
if (!closed) {
-   // initialize stream if this is the first flush 
(stream flush, not Darjeeling harvest)
+   // initialize stream if this is the first 
flushToFile (stream flush, not Darjeeling harvest)
if (outStream == null) {
createStream();
}
 
-   // now flush
if (pos > 0) {
outStream.write(writeBuffer, 0, pos);
pos = 0;
@@ -266,6 +264,16 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
}
}
 
+   /**
+* Flush buffers to file if their size is above {@link 
#localStateThreshold}.
+*/
+   @Override
+   public void flush() throws IOException {
+   if (pos > localStateThreshold) {
+   flushToFile();
+   }
+   }
+
@Override
public void sync() throws IOException {
outStream.sync();
@@ -289,7 +297,7 @@ public class FsCheckpointStreamFactory implements 
CheckpointStreamFactory {
   

[flink] branch release-1.11 updated (f3733905 -> d38ca20)

2020-05-28 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f3733905 [FLINK-17610][state] Align the behavior of result of internal 
map state to return empty iterator
 add d38ca20  [FLINK-17565][k8s] Bump fabric8 kubernetes-client from 4.5.2 
to 4.9.2

No new revisions were added by this update.

Summary of changes:
 flink-end-to-end-tests/test-scripts/common_kubernetes.sh   | 3 +--
 flink-kubernetes/pom.xml   | 3 ++-
 flink-kubernetes/src/main/resources/META-INF/NOTICE| 7 ---
 .../apache/flink/kubernetes/KubernetesClusterDescriptorTest.java   | 4 ++--
 .../kubeclient/decorators/InitJobManagerDecoratorTest.java | 4 ++--
 .../kubeclient/decorators/InitTaskManagerDecoratorTest.java| 4 ++--
 .../kubeclient/factory/KubernetesJobManagerFactoryTest.java| 2 +-
 7 files changed, 14 insertions(+), 13 deletions(-)



[flink] branch release-1.11 updated (f3733905 -> d38ca20)

2020-05-28 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f3733905 [FLINK-17610][state] Align the behavior of result of internal 
map state to return empty iterator
 add d38ca20  [FLINK-17565][k8s] Bump fabric8 kubernetes-client from 4.5.2 
to 4.9.2

No new revisions were added by this update.

Summary of changes:
 flink-end-to-end-tests/test-scripts/common_kubernetes.sh   | 3 +--
 flink-kubernetes/pom.xml   | 3 ++-
 flink-kubernetes/src/main/resources/META-INF/NOTICE| 7 ---
 .../apache/flink/kubernetes/KubernetesClusterDescriptorTest.java   | 4 ++--
 .../kubeclient/decorators/InitJobManagerDecoratorTest.java | 4 ++--
 .../kubeclient/decorators/InitTaskManagerDecoratorTest.java| 4 ++--
 .../kubeclient/factory/KubernetesJobManagerFactoryTest.java| 2 +-
 7 files changed, 14 insertions(+), 13 deletions(-)



[flink] branch release-1.11 updated: [FLINK-17992][checkpointing] Exception from RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler

2020-05-28 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 34e6d22  [FLINK-17992][checkpointing] Exception from 
RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler
34e6d22 is described below

commit 34e6d22bdd179796daf6df46738d85303a839704
Author: Zhijiang 
AuthorDate: Thu May 28 09:00:34 2020 +0200

[FLINK-17992][checkpointing] Exception from RemoteInputChannel#onBuffer 
should not fail the whole NetworkClientHandler

RemoteInputChannel#onBuffer is invoked by 
CreditBasedPartitionRequestClientHandler while receiving and decoding the 
network data. #onBuffer can
throw exceptions which would tag the error in client handler and fail all 
the added input channels inside handler. Then it would cause a tricky
potential issue as following.

If the RemoteInputChannel is canceling by canceler thread, then the task 
thread might exit early than canceler thread terminate. That means the
PartitionRequestClient might not be closed (triggered by canceler thread) 
while the new task attempt is already deployed into the same TaskManager.
Therefore the new task might reuse the previous PartitionRequestClient 
while requesting partitions, but note that the respective client handler was
already tagged an error before during above RemoteInputChannel#onBuffer, to 
cause the next round unnecessary failover.

The solution is to only fail the respective task when its internal 
RemoteInputChannel#onBuffer throws any exceptions instead of failing the whole
channels inside client handler, then the client is still healthy and can 
also be reused by other input channels as long as it is not released yet.
---
 .../CreditBasedPartitionRequestClientHandler.java  | 11 +++-
 ...editBasedPartitionRequestClientHandlerTest.java | 71 ++
 2 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 4f3d872..5097c13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -236,7 +237,8 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
/**
 * Checks for an error and rethrows it if one was reported.
 */
-   private void checkError() throws IOException {
+   @VisibleForTesting
+   void checkError() throws IOException {
final Throwable t = channelError.get();
 
if (t != null) {
@@ -264,7 +266,12 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
return;
}
 
-   decodeBufferOrEvent(inputChannel, bufferOrEvent);
+   try {
+   decodeBufferOrEvent(inputChannel, 
bufferOrEvent);
+   } catch (Throwable t) {
+   inputChannel.onError(t);
+   }
+
 
} else if (msgClazz == NettyMessage.ErrorResponse.class) {
//  Error 
-
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 9487c1c..8cfa4e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import

[flink] 02/02: [FLINK-17994][checkpointing] Fix the race condition between CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

2020-06-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 64ca88ac989ee7525cb821670f293404b7b30d2d
Author: Zhijiang 
AuthorDate: Fri May 29 11:57:33 2020 +0200

[FLINK-17994][checkpointing] Fix the race condition between 
CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

The race condition happens as following:
1. CheckpointBarrierUnaligner#notifyBarrierReceived triggers an async 
checkpoint(ch1) in mailbox by netty thread.
2. CheckpointBarrierUnaligner#processBarrier also triggers a sync 
checkpoint(ch2) by task thread and executes immediately.
3. When ch1 is taken from mailbox by task thread to execute, it will cause 
illegal argument exception because it is smaller than the previous executed ch2.

For async checkpoint action, before it is actual executing, we can compare 
its id with previous executed checkpoint id. If it is not larger than the 
previous
one, we should ignore it to exit directly.

This closes #12406.
---
 .../runtime/io/CheckpointBarrierUnaligner.java |  26 --
 .../flink/streaming/runtime/tasks/StreamTask.java  |   3 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 100 -
 3 files changed, 119 insertions(+), 10 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index 01b1219..1d4bf82 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -137,8 +137,9 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
}
 
/**
-* We still need to trigger checkpoint while reading the first barrier 
from one channel, because
-* this might happen earlier than the previous async trigger via 
mailbox by netty thread.
+* We still need to trigger checkpoint via {@link 
ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
+* while reading the first barrier from one channel, because this might 
happen
+* earlier than the previous async trigger via mailbox by netty thread.
 *
 * Note this is also suitable for the trigger case of local input 
channel.
 */
@@ -256,8 +257,20 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
return threadSafeUnaligner.getNumOpenChannels();
}
 
+   @VisibleForTesting
+   ThreadSafeUnaligner getThreadSafeUnaligner() {
+   return threadSafeUnaligner;
+   }
+
+   private void notifyCheckpoint(CheckpointBarrier barrier) throws 
IOException {
+   // ignore the previous triggered checkpoint by netty thread if 
it was already canceled or aborted before.
+   if (barrier.getId() >= 
threadSafeUnaligner.getCurrentCheckpointId()) {
+   super.notifyCheckpoint(barrier, 0);
+   }
+   }
+
@ThreadSafe
-   private static class ThreadSafeUnaligner implements 
BufferReceivedListener, Closeable {
+   static class ThreadSafeUnaligner implements BufferReceivedListener, 
Closeable {
 
/**
 * Tag the state of which input channel has not received the 
barrier, such that newly arriving buffers need
@@ -280,7 +293,6 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
 */
private long currentReceivedCheckpointId = -1L;
 
-   /** The number of open channels. */
private int numOpenChannels;
 
private final ChannelStateWriter channelStateWriter;
@@ -300,7 +312,7 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
 
if (currentReceivedCheckpointId < barrierId) {
handleNewCheckpoint(barrier);
-   handler.executeInTaskThread(() -> 
handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
+   handler.executeInTaskThread(() -> 
handler.notifyCheckpoint(barrier), "notifyCheckpoint");
}
 
int channelIndex = 
handler.getFlattenedChannelIndex(channelInfo);
@@ -396,5 +408,9 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
synchronized int getNumOpenChannels() {
return numOpenChannels;
}
+
+   synchronized long getCurrentCheckpointId() {
+   return currentReceivedCheckpointId;
+   

[flink] branch release-1.11 updated (9737f50 -> 64ca88a)

2020-06-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9737f50  [FLINK-17230] Fix incorrect returned address of Endpoint for 
external Service of ClusterIP type
 new e25b950  [hotfix][checkpointing] Fix the formatting of 
CheckpointBarrierUnaligner
 new 64ca88a  [FLINK-17994][checkpointing] Fix the race condition between 
CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/CheckpointBarrierUnaligner.java |  99 ++--
 .../flink/streaming/runtime/tasks/StreamTask.java  |   3 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 100 -
 3 files changed, 149 insertions(+), 53 deletions(-)



[flink] 01/02: [hotfix][checkpointing] Fix the formatting of CheckpointBarrierUnaligner

2020-06-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e25b950f2c351c24515e769a175ec8d52e29f835
Author: Zhijiang 
AuthorDate: Thu May 28 23:34:27 2020 +0800

[hotfix][checkpointing] Fix the formatting of CheckpointBarrierUnaligner
---
 .../runtime/io/CheckpointBarrierUnaligner.java | 81 +-
 1 file changed, 34 insertions(+), 47 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
index d39accf..01b1219 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
@@ -130,9 +130,6 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
 
/**
 * For unaligned checkpoint, it never blocks processing from the task 
aspect.
-*
-* For PoC, we do not consider the possibility that the unaligned 
checkpoint would
-* not perform due to the max configured unaligned checkpoint size.
 */
@Override
public boolean isBlocked(int channelIndex) {
@@ -140,16 +137,13 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
}
 
/**
-* We still need to trigger checkpoint while reading the first barrier 
from one channel, because this might happen
-* earlier than the previous async trigger via mailbox by netty thread. 
And the {@link AbstractInvokable} has the
-* deduplication logic to guarantee trigger checkpoint only once 
finally.
+* We still need to trigger checkpoint while reading the first barrier 
from one channel, because
+* this might happen earlier than the previous async trigger via 
mailbox by netty thread.
 *
 * Note this is also suitable for the trigger case of local input 
channel.
 */
@Override
-   public void processBarrier(
-   CheckpointBarrier receivedBarrier,
-   int channelIndex) throws Exception {
+   public void processBarrier(CheckpointBarrier receivedBarrier, int 
channelIndex) throws Exception {
long barrierId = receivedBarrier.getId();
if (currentConsumedCheckpointId > barrierId || 
(currentConsumedCheckpointId == barrierId && !isCheckpointPending())) {
// ignore old and cancelled barriers
@@ -164,33 +158,32 @@ public class CheckpointBarrierUnaligner extends 
CheckpointBarrierHandler {
hasInflightBuffers[channelIndex] = false;
numBarrierConsumed++;
}
-   // processBarrier is called from task thread and can actually 
happen before notifyBarrierReceived on empty
-   // buffer queues
-   // to avoid replicating any logic, we simply call 
notifyBarrierReceived here as well
threadSafeUnaligner.notifyBarrierReceived(receivedBarrier, 
channelInfos[channelIndex]);
}
 
@Override
public void processCancellationBarrier(CancelCheckpointMarker 
cancelBarrier) throws Exception {
-   final long barrierId = cancelBarrier.getCheckpointId();
+   long cancelledId = cancelBarrier.getCheckpointId();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, cancelledId);
+   }
 
-   if (currentConsumedCheckpointId >= barrierId && 
!isCheckpointPending()) {
+   if (currentConsumedCheckpointId >= cancelledId && 
!isCheckpointPending()) {
return;
}
 
if (isCheckpointPending()) {
LOG.warn("{}: Received cancellation barrier for 
checkpoint {} before completing current checkpoint {}. " +
-   "Skipping current 
checkpoint.",
-   taskName,
-   barrierId,
-   currentConsumedCheckpointId);
-   } else if (LOG.isDebugEnabled()) {
-   LOG.debug("{}: Checkpoint {} canceled, aborting 
alignment.", taskName, barrierId);
+   "Skipping current checkpoint.",
+   taskName,
+   cancelledId,
+   currentConsumedCheckpointId);
}
+
relea

[flink] branch master updated (1c78ab3 -> 8c7c726)

2020-06-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1c78ab3  [FLINK-17340][docs] Update docs which related to default 
planner changing.
 add 8c7c726  [FLINK-17823][network] Resolve the race condition while 
releasing RemoteInputChannel

No new revisions were added by this update.

Summary of changes:
 .../partition/consumer/RemoteInputChannel.java | 15 +--
 .../partition/consumer/RemoteInputChannelTest.java | 51 ++
 2 files changed, 63 insertions(+), 3 deletions(-)



[flink] branch master updated (1c78ab3 -> 8c7c726)

2020-06-01 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 1c78ab3  [FLINK-17340][docs] Update docs which related to default 
planner changing.
 add 8c7c726  [FLINK-17823][network] Resolve the race condition while 
releasing RemoteInputChannel

No new revisions were added by this update.

Summary of changes:
 .../partition/consumer/RemoteInputChannel.java | 15 +--
 .../partition/consumer/RemoteInputChannelTest.java | 51 ++
 2 files changed, 63 insertions(+), 3 deletions(-)



[flink] branch master updated: [FLINK-17992][checkpointing] Exception from RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler

2020-06-02 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 371f3de  [FLINK-17992][checkpointing] Exception from 
RemoteInputChannel#onBuffer should not fail the whole NetworkClientHandler
371f3de is described below

commit 371f3de5371afb78d465315098bebec6ed36656b
Author: Zhijiang 
AuthorDate: Thu May 28 09:00:34 2020 +0200

[FLINK-17992][checkpointing] Exception from RemoteInputChannel#onBuffer 
should not fail the whole NetworkClientHandler

RemoteInputChannel#onBuffer is invoked by 
CreditBasedPartitionRequestClientHandler while receiving and decoding the 
network data. #onBuffer can
throw exceptions which would tag the error in client handler and fail all 
the added input channels inside handler. Then it would cause a tricky
potential issue as following.

If the RemoteInputChannel is canceling by canceler thread, then the task 
thread might exit early than canceler thread terminate. That means the
PartitionRequestClient might not be closed (triggered by canceler thread) 
while the new task attempt is already deployed into the same TaskManager.
Therefore the new task might reuse the previous PartitionRequestClient 
while requesting partitions, but note that the respective client handler was
already tagged an error before during above RemoteInputChannel#onBuffer, to 
cause the next round unnecessary failover.

The solution is to only fail the respective task when its internal 
RemoteInputChannel#onBuffer throws any exceptions instead of failing the whole
channels inside client handler, then the client is still healthy and can 
also be reused by other input channels as long as it is not released yet.
---
 .../CreditBasedPartitionRequestClientHandler.java  | 11 +++-
 ...editBasedPartitionRequestClientHandlerTest.java | 71 ++
 2 files changed, 80 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index 4f3d872..5097c13 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -236,7 +237,8 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
/**
 * Checks for an error and rethrows it if one was reported.
 */
-   private void checkError() throws IOException {
+   @VisibleForTesting
+   void checkError() throws IOException {
final Throwable t = channelError.get();
 
if (t != null) {
@@ -264,7 +266,12 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
return;
}
 
-   decodeBufferOrEvent(inputChannel, bufferOrEvent);
+   try {
+   decodeBufferOrEvent(inputChannel, 
bufferOrEvent);
+   } catch (Throwable t) {
+   inputChannel.onError(t);
+   }
+
 
} else if (msgClazz == NettyMessage.ErrorResponse.class) {
//  Error 
-
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 9487c1c..8cfa4e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -18,8 +18,10 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
+import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.PartitionRequestClient;
+import org.apache.flink.runtime.io.network.TestingConnectionManager;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import

[flink] branch master updated (19bbd6d -> aa882c1)

2020-06-03 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 19bbd6d  [FLINK-17945][python] Improve the error message when 
instantiating non-existing Java class
 add 9add433  [hotfix][checkpointing] Fix the formatting of 
CheckpointBarrierUnaligner
 add aa882c1  [FLINK-17994][checkpointing] Fix the race condition between 
CheckpointBarrierUnaligner#processBarrier and #notifyBarrierReceived

No new revisions were added by this update.

Summary of changes:
 .../runtime/io/CheckpointBarrierUnaligner.java |  99 ++--
 .../flink/streaming/runtime/tasks/StreamTask.java  |   3 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 100 -
 3 files changed, 149 insertions(+), 53 deletions(-)



[flink] branch master updated: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d25609d  [FLINK-18139][checkpointing] Fixing unaligned checkpoints 
checks wrong channels for inflight data.
d25609d is described below

commit d25609d07f52ba9bac3f7bb33ce0635e255e3d9d
Author: Arvid Heise 
AuthorDate: Thu Jun 4 22:51:18 2020 +0200

[FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong 
channels for inflight data.

CheckpointBarrierUnaligner#hasInflightData was not called with input gate 
contextual information, such that only the same first few channels are checked 
during initial snapshotting of inflight data for multi-gate setups.
---
 .../partition/consumer/SingleInputGateBuilder.java |  4 +-
 .../io/AlternatingCheckpointBarrierHandler.java|  5 +-
 .../runtime/io/CheckpointBarrierHandler.java   |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java |  4 +-
 .../runtime/io/CheckpointedInputGate.java  |  5 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   |  4 +-
 .../runtime/io/StreamTaskNetworkInputTest.java | 73 +-
 7 files changed, 85 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index ad607f7..b279998 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -70,12 +70,12 @@ public class SingleInputGateBuilder {
return this;
}
 
-   SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
+   public SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
return this;
}
 
-   SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
+   public SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
this.gateIndex = gateIndex;
return this;
}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 09f05b9..8ed6788 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -99,9 +100,9 @@ class AlternatingCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
}
 
@Override
-   public boolean hasInflightData(long checkpointId, int channelIndex) {
+   public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
// should only be called for unaligned checkpoint
-   return unalignedHandler.hasInflightData(checkpointId, 
channelIndex);
+   return unalignedHandler.hasInflightData(checkpointId, 
channelInfo);
}
 
@Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 07f6f7a..952af24 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import

[flink] branch release-1.11 updated: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-06 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new b2852d3  [FLINK-18139][checkpointing] Fixing unaligned checkpoints 
checks wrong channels for inflight data.
b2852d3 is described below

commit b2852d3f80ea767fb730290ee49706280559ffc8
Author: Arvid Heise 
AuthorDate: Thu Jun 4 22:51:18 2020 +0200

[FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong 
channels for inflight data.

CheckpointBarrierUnaligner#hasInflightData was not called with input gate 
contextual information, such that only the same first few channels are checked 
during initial snapshotting of inflight data for multi-gate setups.
---
 .../partition/consumer/SingleInputGateBuilder.java |  4 +-
 .../io/AlternatingCheckpointBarrierHandler.java|  5 +-
 .../runtime/io/CheckpointBarrierHandler.java   |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java |  4 +-
 .../runtime/io/CheckpointedInputGate.java  |  5 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   |  4 +-
 .../runtime/io/StreamTaskNetworkInputTest.java | 73 +-
 7 files changed, 85 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index ad607f7..b279998 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -70,12 +70,12 @@ public class SingleInputGateBuilder {
return this;
}
 
-   SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
+   public SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
return this;
}
 
-   SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
+   public SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
this.gateIndex = gateIndex;
return this;
}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 09f05b9..8ed6788 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -99,9 +100,9 @@ class AlternatingCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
}
 
@Override
-   public boolean hasInflightData(long checkpointId, int channelIndex) {
+   public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
// should only be called for unaligned checkpoint
-   return unalignedHandler.hasInflightData(checkpointId, 
channelIndex);
+   return unalignedHandler.hasInflightData(checkpointId, 
channelInfo);
}
 
@Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 07f6f7a..952af24 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import

[flink] branch release-1.11 updated (cc59b3a -> b2852d3)

2020-06-06 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from cc59b3a  [FLINK-18020] Increase timeout in SQLClientKafkaITCase
 add b2852d3  [FLINK-18139][checkpointing] Fixing unaligned checkpoints 
checks wrong channels for inflight data.

No new revisions were added by this update.

Summary of changes:
 .../partition/consumer/SingleInputGateBuilder.java |  4 +-
 .../io/AlternatingCheckpointBarrierHandler.java|  5 +-
 .../runtime/io/CheckpointBarrierHandler.java   |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java |  4 +-
 .../runtime/io/CheckpointedInputGate.java  |  5 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   |  4 +-
 .../runtime/io/StreamTaskNetworkInputTest.java | 73 +-
 7 files changed, 85 insertions(+), 13 deletions(-)



[flink] branch master updated (44af789 -> f2dd4b8)

2020-06-08 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 44af789  [FLINK-16350] Support Zookeeper 3.5 in 
test_ha_per_job_cluster_datastream.sh
 add ed7b0b1  [FLINK-18050][task][checkpointing] Use CloseableIterator to 
write ResultSubpartition state
 add f2dd4b8  [FLINK-18050][task][checkpointing] Simplify 
ChannelStateCheckpointWriter interface

No new revisions were added by this update.

Summary of changes:
 .../channel/ChannelStateCheckpointWriter.java  | 16 +++--
 .../channel/ChannelStateWriteRequest.java  | 22 +--
 .../checkpoint/channel/ChannelStateWriterImpl.java | 23 +---
 ...ChannelStateWriteRequestDispatcherImplTest.java | 69 ++
 .../ChannelStateWriteRequestDispatcherTest.java|  2 +-
 5 files changed, 94 insertions(+), 38 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java



[flink] branch release-1.11 updated (ab6cf40 -> 822e01b)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ab6cf40  [FLINK-18130][hive][fs-connector] File name conflict for 
different jobs in filesystem/hive sink ()
 add a233c0f  [FLINK-18050][task][checkpointing] Use CloseableIterator to 
write ResultSubpartition state
 add 822e01b  [FLINK-18050][task][checkpointing] Simplify 
ChannelStateCheckpointWriter interface

No new revisions were added by this update.

Summary of changes:
 .../channel/ChannelStateCheckpointWriter.java  | 16 +++--
 .../channel/ChannelStateWriteRequest.java  | 22 +--
 .../checkpoint/channel/ChannelStateWriterImpl.java | 23 +---
 ...ChannelStateWriteRequestDispatcherImplTest.java | 69 ++
 .../ChannelStateWriteRequestDispatcherTest.java|  2 +-
 5 files changed, 94 insertions(+), 38 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriteRequestDispatcherImplTest.java



[flink] branch master updated (64f8f65 -> 6dc624d)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 64f8f65  [FLINK-18042][tests] Auto-detect distDir
 add 6dc624d  [FLINK-18136][checkpointing] Don't start channel state writer 
for savepoint

No new revisions were added by this update.

Summary of changes:
 .../state/TestCheckpointStorageWorkerView.java | 52 +
 .../runtime/io/CheckpointBarrierUnaligner.java | 15 ++--
 .../streaming/runtime/io/InputProcessorUtil.java   | 14 ++--
 .../runtime/tasks/MultipleInputStreamTask.java |  2 +-
 .../runtime/tasks/OneInputStreamTask.java  |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  7 +-
 .../tasks/SubtaskCheckpointCoordinator.java| 10 ++-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 50 ++--
 .../runtime/tasks/TwoInputStreamTask.java  |  2 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   | 12 +--
 ...CheckpointBarrierUnalignerCancellationTest.java |  4 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java |  6 +-
 .../runtime/io/InputProcessorUtilTest.java |  3 +-
 .../runtime/io/StreamTaskNetworkInputTest.java |  5 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 47 
 .../tasks/TestSubtaskCheckpointCoordinator.java| 89 ++
 16 files changed, 278 insertions(+), 42 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
 create mode 100644 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TestSubtaskCheckpointCoordinator.java



[flink] branch release-1.11 updated (822e8a3 -> 2fe888c)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 822e8a3  [FLINK-18152][Depl] Fail fast in JM scripts if memory 
configuration failed
 add b552ff2  [hotfix][checkpointing] Add VisibleForTesting annotation for 
related methods
 add 9860579  [FLINK-18063][checkpointing][refactoring] Implement default 
#isBlocked method in CheckpointBarrierHandler
 add 587a399  [FLINK-18063][checkpointing] Fix the invalid implementation 
of AlternatingCheckpointBarrierHandler#getAlignmentDurationNanos
 add 2fe888c  [FLINK-18063][checkpointing] Fix the race condition of 
aborting checkpoint in CheckpointBarrierUnaligner

No new revisions were added by this update.

Summary of changes:
 .../io/AlternatingCheckpointBarrierHandler.java|  21 ++--
 .../runtime/io/CheckpointBarrierAligner.java   |   8 ++
 .../runtime/io/CheckpointBarrierHandler.java   |  14 ++-
 .../runtime/io/CheckpointBarrierTracker.java   |  14 ---
 .../runtime/io/CheckpointBarrierUnaligner.java | 126 ++--
 .../runtime/io/CheckpointedInputGate.java  |   3 +-
 ...CheckpointBarrierUnalignerCancellationTest.java |   2 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 129 -
 8 files changed, 222 insertions(+), 95 deletions(-)



[flink] branch master updated (44acd50 -> f672ae5)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 44acd50  [FLINK-18152][Depl] Fail fast in JM scripts if memory 
configuration failed
 add bb0ebca  [hotfix][checkpointing] Add VisibleForTesting annotation for 
related methods
 add 0cbd5e3  [FLINK-18063][checkpointing][refactoring] Implement default 
#isBlocked method in CheckpointBarrierHandler
 add d99e1f8  [FLINK-18063][checkpointing] Fix the invalid implementation 
of AlternatingCheckpointBarrierHandler#getAlignmentDurationNanos
 add f672ae5  [FLINK-18063][checkpointing] Fix the race condition of 
aborting checkpoint in CheckpointBarrierUnaligner

No new revisions were added by this update.

Summary of changes:
 .../io/AlternatingCheckpointBarrierHandler.java|  21 ++--
 .../runtime/io/CheckpointBarrierAligner.java   |   8 ++
 .../runtime/io/CheckpointBarrierHandler.java   |  14 ++-
 .../runtime/io/CheckpointBarrierTracker.java   |  14 ---
 .../runtime/io/CheckpointBarrierUnaligner.java | 126 ++--
 .../runtime/io/CheckpointedInputGate.java  |   3 +-
 ...CheckpointBarrierUnalignerCancellationTest.java |   2 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 129 -
 8 files changed, 222 insertions(+), 95 deletions(-)



[flink] branch master updated (a80dc5a -> 892e1c3)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from a80dc5a  [hotfix][tests] Add missing import
 add 892e1c3  [hotfix][tests] Fix the compile issue caused by constructor 
change of CheckpointBarrierUnaligner

No new revisions were added by this update.

Summary of changes:
 .../streaming/runtime/io/CheckpointBarrierUnalignerTest.java  | 8 
 1 file changed, 4 insertions(+), 4 deletions(-)



[flink] branch master updated (892e1c3 -> 131cf30)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 892e1c3  [hotfix][tests] Fix the compile issue caused by constructor 
change of CheckpointBarrierUnaligner
 add 131cf30  [hotfix][testing] Remove the unused import from 
CheckpointBarrierUnalignerTest

No new revisions were added by this update.

Summary of changes:
 .../flink/streaming/runtime/io/CheckpointBarrierUnalignerTest.java   | 1 -
 1 file changed, 1 deletion(-)



[flink] branch release-1.11 updated (2fb7b14 -> fe229a3)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2fb7b14  [hotfix] Improve JavaDocs comments for 
FlinkMemory/FlinkMemoryUtis
 add fe229a3  [FLINK-18057][tests] Fix unstable test 
SingleInputGateTest#testConcurrentReadStateAndProcessAndClose

No new revisions were added by this update.

Summary of changes:
 .../runtime/io/network/partition/consumer/SingleInputGateTest.java  | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)



[flink] branch master updated (d757fc1 -> 9a7dbfc)

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d757fc1  [hotfix] Improve JavaDocs comments for 
FlinkMemory/FlinkMemoryUtis
 add 9a7dbfc  [FLINK-18057][tests] Fix unstable test 
SingleInputGateTest#testConcurrentReadStateAndProcessAndClose

No new revisions were added by this update.

Summary of changes:
 .../runtime/io/network/partition/consumer/SingleInputGateTest.java  | 6 +-
 1 file changed, 5 insertions(+), 1 deletion(-)



[flink] branch release-1.11 updated: [FLINK-18136][checkpointing] Don't start channel state writer for savepoint

2020-06-09 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 7f410fa  [FLINK-18136][checkpointing] Don't start channel state writer 
for savepoint
7f410fa is described below

commit 7f410fa4f4758e48fcc6c33d8354c21ab02e2dc6
Author: Roman 
AuthorDate: Tue Jun 9 10:58:46 2020 +0200

[FLINK-18136][checkpointing] Don't start channel state writer for savepoint

ChannelStateWriter#start should be only called for unaligned checkpoint. 
While source triggering
savepoint, SubtaskCheckpointCoordinator#initCheckpoint is introduced to 
judge the condition
whether to start the internal writer or not. And this new method is also 
used in other places like
CheckpointBarrierUnaligner.

This closes #12489.
---
 .../state/TestCheckpointStorageWorkerView.java | 52 +
 .../runtime/io/CheckpointBarrierUnaligner.java | 15 ++--
 .../streaming/runtime/io/InputProcessorUtil.java   | 14 ++--
 .../runtime/tasks/MultipleInputStreamTask.java |  2 +-
 .../runtime/tasks/OneInputStreamTask.java  |  2 +-
 .../flink/streaming/runtime/tasks/StreamTask.java  |  7 +-
 .../tasks/SubtaskCheckpointCoordinator.java| 10 ++-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 50 ++--
 .../runtime/tasks/TwoInputStreamTask.java  |  2 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   | 12 +--
 ...CheckpointBarrierUnalignerCancellationTest.java |  4 +-
 .../runtime/io/CheckpointBarrierUnalignerTest.java | 14 ++--
 .../runtime/io/InputProcessorUtilTest.java |  3 +-
 .../runtime/io/StreamTaskNetworkInputTest.java |  5 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 47 
 .../tasks/TestSubtaskCheckpointCoordinator.java| 89 ++
 16 files changed, 282 insertions(+), 46 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
new file mode 100644
index 000..c50a390
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TestCheckpointStorageWorkerView.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
+
+import java.io.IOException;
+
+/**
+ * Non-persistent {@link CheckpointStorageWorkerView} for tests. Uses {@link 
MemCheckpointStreamFactory}.
+ */
+public class TestCheckpointStorageWorkerView implements 
CheckpointStorageWorkerView {
+
+   private final int maxStateSize;
+   private final MemCheckpointStreamFactory 
taskOwnedCheckpointStreamFactory;
+   private final CheckpointedStateScope taskOwnedStateScope;
+
+   public TestCheckpointStorageWorkerView(int maxStateSize) {
+   this(maxStateSize, CheckpointedStateScope.EXCLUSIVE);
+   }
+
+   private TestCheckpointStorageWorkerView(int maxStateSize, 
CheckpointedStateScope taskOwnedStateScope) {
+   this.maxStateSize = maxStateSize;
+   this.taskOwnedCheckpointStreamFactory = new 
MemCheckpointStreamFactory(maxStateSize);
+   this.taskOwnedStateScope = taskOwnedStateScope;
+   }
+
+   @Override
+   public CheckpointStreamFactory resolveCheckpointStorageLocation(long 
checkpointId, CheckpointStorageLocationReference reference) {
+   return new MemCheckpointStreamFactory(maxStateSize);
+   }
+
+   @Override
+   public CheckpointStreamFactory.CheckpointStateOutputStream 
createTaskOwnedStateStream() throws IOException {
+   return 
taskOwnedCheckpointStreamFactory.createCheckpointStateOutputStream(taskOwnedStateScope);
+   }
+}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/Checkpo

[flink] branch master updated (7143e6a -> b2d1c2d)

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 7143e6a  [FLINK-17944][sql-client] Wrong output in SQL Client's table 
mode
 add 1cb9d54  [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log 
messages
 add 82a13db  [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to 
AsyncCheckpointRunnable
 add 73afe2b  [FLINK-17869][task][checkpointing] Revert 
"[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous 
checkpoints before a new checkpoint is started."
 add 91df1a5  [FLINK-17869][task][checkpointing] Abort channel state write 
if checkpoint is subsumed
 add 2b51cda  [FLINK-17869][task][checkpointing] Increase 
ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS
 add 64ff676  [FLINK-17869][task][checkpointing] Ignore out of order 
checkpoints in SubtaskCheckpointCoordinator
 add 26762bd1 [FLINK-17869][tests] Unignore UnalignedCheckpointITCase
 add b2d1c2d  [FLINK-17869][task][checkpointing] Abort writing of channel 
state by RPC notification

No new revisions were added by this update.

Summary of changes:
 .../checkpoint/channel/ChannelStateWriter.java | 29 +---
 .../checkpoint/channel/ChannelStateWriterImpl.java | 25 --
 .../channel/ChannelStateWriterImplTest.java| 49 +---
 .../checkpoint/channel/MockChannelStateWriter.java |  8 +---
 .../channel/RecordingChannelStateWriter.java   |  5 --
 .../runtime/state/ChannelPersistenceITCase.java|  2 +-
 .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++--
 .../runtime/tasks/AsyncCheckpointRunnable.java |  6 ---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 54 +-
 .../runtime/tasks/LocalStateForwardingTest.java|  2 -
 .../tasks/TestSubtaskCheckpointCoordinator.java|  2 +-
 .../checkpointing/UnalignedCheckpointITCase.java   |  2 -
 12 files changed, 95 insertions(+), 120 deletions(-)



[flink] 01/08: [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4030b1b30a2208e494d639af8767c3a944d87d58
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 14:34:30 2020 +0200

[FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages
---
 .../flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java| 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index e6aa9dc..fc8655c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -102,11 +102,11 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
LOG.debug("{} starting checkpoint {} ({})", taskName, 
checkpointId, checkpointOptions);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateWriteResult put = 
results.computeIfAbsent(checkpointId, id -> {
-   Preconditions.checkState(results.size() < 
maxCheckpoints, "results.size() > maxCheckpoints", results.size(), 
maxCheckpoints);
+   Preconditions.checkState(results.size() < 
maxCheckpoints, String.format("%s can't start %d, results.size() > 
maxCheckpoints: %d > %d", taskName, checkpointId, results.size(), 
maxCheckpoints));
enqueue(new CheckpointStartRequest(checkpointId, 
result, checkpointOptions.getTargetLocation()), false);
return result;
});
-   Preconditions.checkArgument(put == result, "result future 
already present for checkpoint " + checkpointId);
+   Preconditions.checkArgument(put == result, taskName + " result 
future already present for checkpoint " + checkpointId);
}
 
@Override
@@ -156,7 +156,7 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
public ChannelStateWriteResult getWriteResult(long checkpointId) {
LOG.debug("{} requested write result, checkpoint {}", taskName, 
checkpointId);
ChannelStateWriteResult result = results.get(checkpointId);
-   Preconditions.checkArgument(result != null, "channel state 
write result not found for checkpoint " + checkpointId);
+   Preconditions.checkArgument(result != null, taskName + " 
channel state write result not found for checkpoint " + checkpointId);
return result;
}
 



[flink] 02/08: [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36609bb5dc2aee4061a47f7a767630f1f5912d96
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 23:03:52 2020 +0200

[FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to 
AsyncCheckpointRunnable

OperatorSnapshotFinalizer already waits and holds this future.
ChannelStateWriter.getWriteResult() can then be non-idempotent.
ChannelStateWriter.stop() can then be removed.
---
 .../checkpoint/channel/ChannelStateWriter.java | 21 ++--
 .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++--
 .../channel/ChannelStateWriterImplTest.java| 28 ++
 .../checkpoint/channel/MockChannelStateWriter.java |  6 +
 .../channel/RecordingChannelStateWriter.java   |  5 
 .../runtime/state/ChannelPersistenceITCase.java|  2 +-
 .../runtime/tasks/AsyncCheckpointRunnable.java |  6 -
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +---
 .../runtime/tasks/LocalStateForwardingTest.java|  2 --
 9 files changed, 24 insertions(+), 71 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 5dad559..af2a708 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable {
 * Finalize write of channel state data for the given checkpoint id.
 * Must be called after {@link #start(long, CheckpointOptions)} and all 
of the input data of the given checkpoint added.
 * When both {@link #finishInput} and {@link #finishOutput} were called 
the results can be (eventually) obtained
-* using {@link #getWriteResult}
+* using {@link #getAndRemoveWriteResult}
 */
void finishInput(long checkpointId);
 
@@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable {
 * Finalize write of channel state data for the given checkpoint id.
 * Must be called after {@link #start(long, CheckpointOptions)} and all 
of the output data of the given checkpoint added.
 * When both {@link #finishInput} and {@link #finishOutput} were called 
the results can be (eventually) obtained
-* using {@link #getWriteResult}
+* using {@link #getAndRemoveWriteResult}
 */
void finishOutput(long checkpointId);
 
/**
 * Aborts the checkpoint and fails pending result for this checkpoint.
+* @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not 
supposed to be called afterwards.
 */
void abort(long checkpointId, Throwable cause);
 
/**
-* Must be called after {@link #start(long, CheckpointOptions)}.
+* Must be called after {@link #start(long, CheckpointOptions)} once.
+* @throws IllegalArgumentException if the passed checkpointId is not 
known.
 */
-   ChannelStateWriteResult getWriteResult(long checkpointId);
-
-   /**
-* Cleans up the internal state for the given checkpoint.
-*/
-   void stop(long checkpointId);
+   ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) 
throws IllegalArgumentException;
 
ChannelStateWriter NO_OP = new NoOpChannelStateWriter();
 
@@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable {
}
 
@Override
-   public ChannelStateWriteResult getWriteResult(long 
checkpointId) {
+   public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
return ChannelStateWriteResult.EMPTY;
}
 
@Override
public void close() {
}
-
-   @Override
-   public void stop(long checkpointId) {
-   }
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fc8655c..6158358 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
}
 
@Override
-   public ChannelStateWriteResult getWriteResult(long checkpointId) {
+   public ChannelStateWriteR

[flink] 05/08: [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e14958dcd09f7c32ba2ba25fc6a48696fe7eeca2
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 15:56:47 2020 +0200

[FLINK-17869][task][checkpointing] Increase 
ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS

ChannelStateWriter map is cleaned up by the task thread,
so the check in netty thread should take possible delay
into account.
---
 .../apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 3e18050..89a5247 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -56,7 +56,7 @@ import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-   private static final int DEFAULT_MAX_CHECKPOINTS = 5; // currently, 
only single in-flight checkpoint is supported
+   private static final int DEFAULT_MAX_CHECKPOINTS = 1000; // includes 
max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox)
 
private final String taskName;
private final ChannelStateWriteRequestExecutor executor;



[flink] 08/08: [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d75ec8efe3c31f2a5e1a4572b5cfb8000ddbf67
Author: Roman Khachatryan 
AuthorDate: Thu Jun 4 10:06:15 2020 +0200

[FLINK-17869][task][checkpointing] Abort writing of channel state by RPC
notification
---
 .../flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 5d7a2c9..d904c87 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -306,6 +306,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
}
}
 
+   channelStateWriter.abort(checkpointId, new 
CancellationException("checkpoint aborted via notification"), false);
+
for (StreamOperatorWrapper operatorWrapper : 
operatorChain.getAllOperators(true)) {
try {

operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);



[flink] branch release-1.11 updated (31a17cb -> 1d75ec8)

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 31a17cb  [FLINK-18188][Runtime] Derive JM Off-Heap memory from 
configured Total Flink Memory minus JVM Heap
 new 4030b1b  [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log 
messages
 new 36609bb  [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to 
AsyncCheckpointRunnable
 new 6cb8f28  [FLINK-17869][task][checkpointing] Revert 
"[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous 
checkpoints before a new checkpoint is started."
 new d806924  [FLINK-17869][task][checkpointing] Abort channel state write 
if checkpoint is subsumed
 new e14958d  [FLINK-17869][task][checkpointing] Increase 
ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS
 new 7bb3ffa  [FLINK-17869][task][checkpointing] Ignore out of order 
checkpoints in SubtaskCheckpointCoordinator
 new 0634a2e  [FLINK-17869][tests] Unignore UnalignedCheckpointITCase
 new 1d75ec8  [FLINK-17869][task][checkpointing] Abort writing of channel 
state by RPC notification

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../checkpoint/channel/ChannelStateWriter.java | 29 +---
 .../checkpoint/channel/ChannelStateWriterImpl.java | 25 --
 .../channel/ChannelStateWriterImplTest.java| 49 +---
 .../checkpoint/channel/MockChannelStateWriter.java |  8 +---
 .../channel/RecordingChannelStateWriter.java   |  5 --
 .../runtime/state/ChannelPersistenceITCase.java|  2 +-
 .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++--
 .../runtime/tasks/AsyncCheckpointRunnable.java |  6 ---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 54 +-
 .../runtime/tasks/LocalStateForwardingTest.java|  2 -
 .../tasks/TestSubtaskCheckpointCoordinator.java|  2 +-
 .../checkpointing/UnalignedCheckpointITCase.java   |  2 -
 12 files changed, 95 insertions(+), 120 deletions(-)



[flink] 03/08: [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 14:12:04 2020 +0200

[FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] 
Ensuring that ChannelStateWriter aborts previous checkpoints before a new 
checkpoint is started."

This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058
which introduced a race condition when task thread and netty
thread compete for ChannelStateWriteResult.

Instead, next commits fix it by:
1. Map size validation error will be prevented simply by increasing the 
limit
2. When a checkpoint is subsumed, it's write result will be removed from on 
future completion
---
 .../checkpoint/channel/ChannelStateWriter.java|  4 +++-
 .../checkpoint/channel/ChannelStateWriterImpl.java|  1 -
 .../channel/ChannelStateWriterImplTest.java   | 19 ---
 3 files changed, 11 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index af2a708..02a3a69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable {
 
@Override
public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
-   return ChannelStateWriteResult.EMPTY;
+   return new ChannelStateWriteResult(
+   
CompletableFuture.completedFuture(Collections.emptyList()),
+   
CompletableFuture.completedFuture(Collections.emptyList()));
}
 
@Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 6158358..8996b3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
 
@Override
public void start(long checkpointId, CheckpointOptions 
checkpointOptions) {
-   results.keySet().forEach(oldCheckpointId -> 
abort(oldCheckpointId, new Exception("Starting new checkpoint " + 
checkpointId)));
LOG.debug("{} starting checkpoint {} ({})", taskName, 
checkpointId, checkpointOptions);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateWriteResult put = 
results.computeIfAbsent(checkpointId, id -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 9d7a7ea..0dae88e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest {
unwrappingError(TestException.class, () -> callStart(writer));
}
 
-   @Test
-   public void testStartAbortsOldCheckpoints() throws Exception {
-   int maxCheckpoints = 10;
-   runWithSyncWorker((writer, worker) -> {
-   writer.start(0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-   ChannelStateWriteResult writeResult = 
writer.getWriteResult(0);
-   for (int i = 1; i <= maxCheckpoints; i++) {
+   @Test(expected = IllegalStateException.class)
+   public void testLimit() throws IOException {
+   int maxCheckpoints = 3;
+   try (ChannelStateWriterImpl writer = new 
ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) {
+   writer.open();
+   for (int i = 0; i < maxCheckpoints; i++) {
writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-   worker.processAllRequests();
-   assertTrue(writeResult.isDone());
-   

[flink] 06/08: [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7bb3ffa91a9916348d2f0a6a2e6cba4b109be56e
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 21:43:56 2020 +0200

[FLINK-17869][task][checkpointing] Ignore out of order checkpoints in 
SubtaskCheckpointCoordinator

Check (by task thread) whether the current checkpoint was already aborted 
in the following scenario:
1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start 
checkpointing (netty thread)
2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread)
3. task thread processes a mail to start checkpointing
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 22 +++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index cf8a21e..5d7a2c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiFunctionWithException;
 
 import org.slf4j.Logger;
@@ -57,10 +56,12 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -183,6 +184,16 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause, OperatorChain operatorChain) throws IOException {
LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, taskName);
+   lastCheckpointId = Math.max(lastCheckpointId, checkpointId);
+   Iterator iterator = abortedCheckpointIds.iterator();
+   while (iterator.hasNext()) {
+   long next = iterator.next();
+   if (next < lastCheckpointId) {
+   iterator.remove();
+   } else {
+   break;
+   }
+   }
 
checkpointStorage.clearCacheFor(checkpointId);
 
@@ -221,9 +232,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
// We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
// checkpoint alignments
 
+   if (lastCheckpointId >= metadata.getCheckpointId()) {
+   LOG.info("Out of order checkpoint barrier (aborted 
previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
+   channelStateWriter.abort(metadata.getCheckpointId(), 
new CancellationException(), true);
+   checkAndClearAbortedStatus(metadata.getCheckpointId());
+   return;
+   }
+
// Step (0): Record the last triggered checkpointId.
-   Preconditions.checkArgument(lastCheckpointId < 
metadata.getCheckpointId(), String.format(
-   "Unexpected current checkpoint-id: %s vs last 
checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId));
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());



[flink] 07/08: [FLINK-17869][tests] Unignore UnalignedCheckpointITCase

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0634a2efd2f0ca42d4cb6762015be650c04a3136
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 22:12:15 2020 +0200

[FLINK-17869][tests] Unignore UnalignedCheckpointITCase
---
 .../org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 47e7007..6211077 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ErrorCollector;
@@ -102,7 +101,6 @@ import static org.hamcrest.Matchers.greaterThan;
  * The number of successful checkpoints is indeed {@code >=n}.
  * 
  */
-@Ignore
 public class UnalignedCheckpointITCase extends TestLogger {
public static final String NUM_INPUTS = "inputs";
public static final String NUM_OUTPUTS = "outputs";



[flink] 04/08: [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8069249703bbe7858e0c6a044deb54ce75e3989
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 14:25:01 2020 +0200

[FLINK-17869][task][checkpointing] Abort channel state write if checkpoint 
is subsumed

Motivation: stop writing channel state ASAP if the checkpoint is subsumed

Changes:
1. complete 
CheckpointBarrierUnaligner.ThreadSafeUnaligner#allBarriersReceivedFuture
2. abort channel state write on its erroneous completion
3. add cleanup parameter to ChannelStateWriter.abort to use cleanup=false
in the call above
---
 .../checkpoint/channel/ChannelStateWriter.java |  4 +--
 .../checkpoint/channel/ChannelStateWriterImpl.java |  6 +++--
 .../channel/ChannelStateWriterImplTest.java|  2 +-
 .../checkpoint/channel/MockChannelStateWriter.java |  2 +-
 .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +++
 .../tasks/TestSubtaskCheckpointCoordinator.java|  2 +-
 7 files changed, 35 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 02a3a69..2112444 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -139,7 +139,7 @@ public interface ChannelStateWriter extends Closeable {
 * Aborts the checkpoint and fails pending result for this checkpoint.
 * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not 
supposed to be called afterwards.
 */
-   void abort(long checkpointId, Throwable cause);
+   void abort(long checkpointId, Throwable cause, boolean cleanup);
 
/**
 * Must be called after {@link #start(long, CheckpointOptions)} once.
@@ -174,7 +174,7 @@ public interface ChannelStateWriter extends Closeable {
}
 
@Override
-   public void abort(long checkpointId, Throwable cause) {
+   public void abort(long checkpointId, Throwable cause, boolean 
cleanup) {
}
 
@Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 8996b3b..3e18050 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -144,11 +144,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
}
 
@Override
-   public void abort(long checkpointId, Throwable cause) {
+   public void abort(long checkpointId, Throwable cause, boolean cleanup) {
LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), 
true); // abort already started
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), 
false); // abort enqueued but not started
-   results.remove(checkpointId);
+   if (cleanup) {
+   results.remove(checkpointId);
+   }
}
 
@Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 0dae88e..d0193dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -290,7 +290,7 @@ public class ChannelStateWriterImplTest {
}
 
private void callAbort(ChannelStateWriter writer) {
-   writer.abort(CHECKPOINT_ID, new TestException());
+   writer.abort(CHECKPOINT_ID, new TestException(), false);
}
 
private void callFinish(ChannelStateWriter writer) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 88bd334..7641d36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/

[flink] 04/08: [FLINK-17869][task][checkpointing] Abort channel state write if checkpoint is subsumed

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d8069249703bbe7858e0c6a044deb54ce75e3989
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 14:25:01 2020 +0200

[FLINK-17869][task][checkpointing] Abort channel state write if checkpoint 
is subsumed

Motivation: stop writing channel state ASAP if the checkpoint is subsumed

Changes:
1. complete 
CheckpointBarrierUnaligner.ThreadSafeUnaligner#allBarriersReceivedFuture
2. abort channel state write on its erroneous completion
3. add cleanup parameter to ChannelStateWriter.abort to use cleanup=false
in the call above
---
 .../checkpoint/channel/ChannelStateWriter.java |  4 +--
 .../checkpoint/channel/ChannelStateWriterImpl.java |  6 +++--
 .../channel/ChannelStateWriterImplTest.java|  2 +-
 .../checkpoint/channel/MockChannelStateWriter.java |  2 +-
 .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +++
 .../tasks/TestSubtaskCheckpointCoordinator.java|  2 +-
 7 files changed, 35 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 02a3a69..2112444 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -139,7 +139,7 @@ public interface ChannelStateWriter extends Closeable {
 * Aborts the checkpoint and fails pending result for this checkpoint.
 * @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not 
supposed to be called afterwards.
 */
-   void abort(long checkpointId, Throwable cause);
+   void abort(long checkpointId, Throwable cause, boolean cleanup);
 
/**
 * Must be called after {@link #start(long, CheckpointOptions)} once.
@@ -174,7 +174,7 @@ public interface ChannelStateWriter extends Closeable {
}
 
@Override
-   public void abort(long checkpointId, Throwable cause) {
+   public void abort(long checkpointId, Throwable cause, boolean 
cleanup) {
}
 
@Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 8996b3b..3e18050 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -144,11 +144,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
}
 
@Override
-   public void abort(long checkpointId, Throwable cause) {
+   public void abort(long checkpointId, Throwable cause, boolean cleanup) {
LOG.debug("{} aborting, checkpoint {}", taskName, checkpointId);
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), 
true); // abort already started
enqueue(ChannelStateWriteRequest.abort(checkpointId, cause), 
false); // abort enqueued but not started
-   results.remove(checkpointId);
+   if (cleanup) {
+   results.remove(checkpointId);
+   }
}
 
@Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 0dae88e..d0193dd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -290,7 +290,7 @@ public class ChannelStateWriterImplTest {
}
 
private void callAbort(ChannelStateWriter writer) {
-   writer.abort(CHECKPOINT_ID, new TestException());
+   writer.abort(CHECKPOINT_ID, new TestException(), false);
}
 
private void callFinish(ChannelStateWriter writer) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
index 88bd334..7641d36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/MockChannelStateWriter.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/

[flink] 01/08: [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4030b1b30a2208e494d639af8767c3a944d87d58
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 14:34:30 2020 +0200

[FLINK-17869][hotfix] Add taskName to ChannelStateWriter log messages
---
 .../flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java| 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index e6aa9dc..fc8655c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -102,11 +102,11 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
LOG.debug("{} starting checkpoint {} ({})", taskName, 
checkpointId, checkpointOptions);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateWriteResult put = 
results.computeIfAbsent(checkpointId, id -> {
-   Preconditions.checkState(results.size() < 
maxCheckpoints, "results.size() > maxCheckpoints", results.size(), 
maxCheckpoints);
+   Preconditions.checkState(results.size() < 
maxCheckpoints, String.format("%s can't start %d, results.size() > 
maxCheckpoints: %d > %d", taskName, checkpointId, results.size(), 
maxCheckpoints));
enqueue(new CheckpointStartRequest(checkpointId, 
result, checkpointOptions.getTargetLocation()), false);
return result;
});
-   Preconditions.checkArgument(put == result, "result future 
already present for checkpoint " + checkpointId);
+   Preconditions.checkArgument(put == result, taskName + " result 
future already present for checkpoint " + checkpointId);
}
 
@Override
@@ -156,7 +156,7 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
public ChannelStateWriteResult getWriteResult(long checkpointId) {
LOG.debug("{} requested write result, checkpoint {}", taskName, 
checkpointId);
ChannelStateWriteResult result = results.get(checkpointId);
-   Preconditions.checkArgument(result != null, "channel state 
write result not found for checkpoint " + checkpointId);
+   Preconditions.checkArgument(result != null, taskName + " 
channel state write result not found for checkpoint " + checkpointId);
return result;
}
 



[flink] 07/08: [FLINK-17869][tests] Unignore UnalignedCheckpointITCase

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0634a2efd2f0ca42d4cb6762015be650c04a3136
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 22:12:15 2020 +0200

[FLINK-17869][tests] Unignore UnalignedCheckpointITCase
---
 .../org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java  | 2 --
 1 file changed, 2 deletions(-)

diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
index 47e7007..6211077 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/UnalignedCheckpointITCase.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.streaming.api.functions.source.RichParallelSourceFunctio
 import org.apache.flink.util.TestLogger;
 
 import org.apache.commons.lang3.ArrayUtils;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.ErrorCollector;
@@ -102,7 +101,6 @@ import static org.hamcrest.Matchers.greaterThan;
  * The number of successful checkpoints is indeed {@code >=n}.
  * 
  */
-@Ignore
 public class UnalignedCheckpointITCase extends TestLogger {
public static final String NUM_INPUTS = "inputs";
public static final String NUM_OUTPUTS = "outputs";



[flink] 06/08: [FLINK-17869][task][checkpointing] Ignore out of order checkpoints in SubtaskCheckpointCoordinator

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7bb3ffa91a9916348d2f0a6a2e6cba4b109be56e
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 21:43:56 2020 +0200

[FLINK-17869][task][checkpointing] Ignore out of order checkpoints in 
SubtaskCheckpointCoordinator

Check (by task thread) whether the current checkpoint was already aborted 
in the following scenario:
1. on checkpoint barrier ThreadSafeUnaligner sends a mail to start 
checkpointing (netty thread)
2. on cancellation marker CheckpointBarrierUnaligner aborts it (task thread)
3. task thread processes a mail to start checkpointing
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 22 +++---
 1 file changed, 19 insertions(+), 3 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index cf8a21e..5d7a2c9 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -44,7 +44,6 @@ import 
org.apache.flink.streaming.api.operators.StreamOperator;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
-import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.function.BiFunctionWithException;
 
 import org.slf4j.Logger;
@@ -57,10 +56,12 @@ import java.io.UncheckedIOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutorService;
@@ -183,6 +184,16 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
@Override
public void abortCheckpointOnBarrier(long checkpointId, Throwable 
cause, OperatorChain operatorChain) throws IOException {
LOG.debug("Aborting checkpoint via cancel-barrier {} for task 
{}", checkpointId, taskName);
+   lastCheckpointId = Math.max(lastCheckpointId, checkpointId);
+   Iterator iterator = abortedCheckpointIds.iterator();
+   while (iterator.hasNext()) {
+   long next = iterator.next();
+   if (next < lastCheckpointId) {
+   iterator.remove();
+   } else {
+   break;
+   }
+   }
 
checkpointStorage.clearCacheFor(checkpointId);
 
@@ -221,9 +232,14 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
// We generally try to emit the checkpoint barrier as soon as 
possible to not affect downstream
// checkpoint alignments
 
+   if (lastCheckpointId >= metadata.getCheckpointId()) {
+   LOG.info("Out of order checkpoint barrier (aborted 
previously?): {} >= {}", lastCheckpointId, metadata.getCheckpointId());
+   channelStateWriter.abort(metadata.getCheckpointId(), 
new CancellationException(), true);
+   checkAndClearAbortedStatus(metadata.getCheckpointId());
+   return;
+   }
+
// Step (0): Record the last triggered checkpointId.
-   Preconditions.checkArgument(lastCheckpointId < 
metadata.getCheckpointId(), String.format(
-   "Unexpected current checkpoint-id: %s vs last 
checkpoint-id: %s", metadata.getCheckpointId(), lastCheckpointId));
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());



[flink] 05/08: [FLINK-17869][task][checkpointing] Increase ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e14958dcd09f7c32ba2ba25fc6a48696fe7eeca2
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 15:56:47 2020 +0200

[FLINK-17869][task][checkpointing] Increase 
ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS

ChannelStateWriter map is cleaned up by the task thread,
so the check in netty thread should take possible delay
into account.
---
 .../apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 3e18050..89a5247 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -56,7 +56,7 @@ import static 
org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteReque
 public class ChannelStateWriterImpl implements ChannelStateWriter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(ChannelStateWriterImpl.class);
-   private static final int DEFAULT_MAX_CHECKPOINTS = 5; // currently, 
only single in-flight checkpoint is supported
+   private static final int DEFAULT_MAX_CHECKPOINTS = 1000; // includes 
max-concurrent-checkpoints + checkpoints to be aborted (scheduled via mailbox)
 
private final String taskName;
private final ChannelStateWriteRequestExecutor executor;



[flink] 03/08: [FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous checkpoints before a new checkpoint is started."

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6cb8f28cf219f79a36b90a567692fafc5f85a2a2
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 14:12:04 2020 +0200

[FLINK-17869][task][checkpointing] Revert "[FLINK-17218][checkpointing] 
Ensuring that ChannelStateWriter aborts previous checkpoints before a new 
checkpoint is started."

This reverts commit 24ff415f1b76392f75dea7c3538558d24fcb7058
which introduced a race condition when task thread and netty
thread compete for ChannelStateWriteResult.

Instead, next commits fix it by:
1. Map size validation error will be prevented simply by increasing the 
limit
2. When a checkpoint is subsumed, it's write result will be removed from on 
future completion
---
 .../checkpoint/channel/ChannelStateWriter.java|  4 +++-
 .../checkpoint/channel/ChannelStateWriterImpl.java|  1 -
 .../channel/ChannelStateWriterImplTest.java   | 19 ---
 3 files changed, 11 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index af2a708..02a3a69 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -179,7 +179,9 @@ public interface ChannelStateWriter extends Closeable {
 
@Override
public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
-   return ChannelStateWriteResult.EMPTY;
+   return new ChannelStateWriteResult(
+   
CompletableFuture.completedFuture(Collections.emptyList()),
+   
CompletableFuture.completedFuture(Collections.emptyList()));
}
 
@Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index 6158358..8996b3b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -98,7 +98,6 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
 
@Override
public void start(long checkpointId, CheckpointOptions 
checkpointOptions) {
-   results.keySet().forEach(oldCheckpointId -> 
abort(oldCheckpointId, new Exception("Starting new checkpoint " + 
checkpointId)));
LOG.debug("{} starting checkpoint {} ({})", taskName, 
checkpointId, checkpointOptions);
ChannelStateWriteResult result = new ChannelStateWriteResult();
ChannelStateWriteResult put = 
results.computeIfAbsent(checkpointId, id -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
index 9d7a7ea..0dae88e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImplTest.java
@@ -186,19 +186,16 @@ public class ChannelStateWriterImplTest {
unwrappingError(TestException.class, () -> callStart(writer));
}
 
-   @Test
-   public void testStartAbortsOldCheckpoints() throws Exception {
-   int maxCheckpoints = 10;
-   runWithSyncWorker((writer, worker) -> {
-   writer.start(0, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-   ChannelStateWriteResult writeResult = 
writer.getWriteResult(0);
-   for (int i = 1; i <= maxCheckpoints; i++) {
+   @Test(expected = IllegalStateException.class)
+   public void testLimit() throws IOException {
+   int maxCheckpoints = 3;
+   try (ChannelStateWriterImpl writer = new 
ChannelStateWriterImpl(TASK_NAME, getStreamFactoryFactory(), maxCheckpoints)) {
+   writer.open();
+   for (int i = 0; i < maxCheckpoints; i++) {
writer.start(i, 
CheckpointOptions.forCheckpointWithDefaultLocation());
-   worker.processAllRequests();
-   assertTrue(writeResult.isDone());
-   

[flink] 08/08: [FLINK-17869][task][checkpointing] Abort writing of channel state by RPC notification

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1d75ec8efe3c31f2a5e1a4572b5cfb8000ddbf67
Author: Roman Khachatryan 
AuthorDate: Thu Jun 4 10:06:15 2020 +0200

[FLINK-17869][task][checkpointing] Abort writing of channel state by RPC
notification
---
 .../flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java | 2 ++
 1 file changed, 2 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index 5d7a2c9..d904c87 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -306,6 +306,8 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
}
}
 
+   channelStateWriter.abort(checkpointId, new 
CancellationException("checkpoint aborted via notification"), false);
+
for (StreamOperatorWrapper operatorWrapper : 
operatorChain.getAllOperators(true)) {
try {

operatorWrapper.getStreamOperator().notifyCheckpointAborted(checkpointId);



[flink] branch release-1.11 updated (31a17cb -> 1d75ec8)

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 31a17cb  [FLINK-18188][Runtime] Derive JM Off-Heap memory from 
configured Total Flink Memory minus JVM Heap
 new 4030b1b  [FLINK-17869][hotfix] Add taskName to ChannelStateWriter log 
messages
 new 36609bb  [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to 
AsyncCheckpointRunnable
 new 6cb8f28  [FLINK-17869][task][checkpointing] Revert 
"[FLINK-17218][checkpointing] Ensuring that ChannelStateWriter aborts previous 
checkpoints before a new checkpoint is started."
 new d806924  [FLINK-17869][task][checkpointing] Abort channel state write 
if checkpoint is subsumed
 new e14958d  [FLINK-17869][task][checkpointing] Increase 
ChannelStateWriterImpl.DEFAULT_MAX_CHECKPOINTS
 new 7bb3ffa  [FLINK-17869][task][checkpointing] Ignore out of order 
checkpoints in SubtaskCheckpointCoordinator
 new 0634a2e  [FLINK-17869][tests] Unignore UnalignedCheckpointITCase
 new 1d75ec8  [FLINK-17869][task][checkpointing] Abort writing of channel 
state by RPC notification

The 8 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../checkpoint/channel/ChannelStateWriter.java | 29 +---
 .../checkpoint/channel/ChannelStateWriterImpl.java | 25 --
 .../channel/ChannelStateWriterImplTest.java| 49 +---
 .../checkpoint/channel/MockChannelStateWriter.java |  8 +---
 .../channel/RecordingChannelStateWriter.java   |  5 --
 .../runtime/state/ChannelPersistenceITCase.java|  2 +-
 .../runtime/io/CheckpointBarrierUnaligner.java | 31 +++--
 .../runtime/tasks/AsyncCheckpointRunnable.java |  6 ---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 54 +-
 .../runtime/tasks/LocalStateForwardingTest.java|  2 -
 .../tasks/TestSubtaskCheckpointCoordinator.java|  2 +-
 .../checkpointing/UnalignedCheckpointITCase.java   |  2 -
 12 files changed, 95 insertions(+), 120 deletions(-)



[flink] 02/08: [FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to AsyncCheckpointRunnable

2020-06-10 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 36609bb5dc2aee4061a47f7a767630f1f5912d96
Author: Roman Khachatryan 
AuthorDate: Wed Jun 3 23:03:52 2020 +0200

[FLINK-17869][hotfix] Don't pass ChannelStateWrite Future to 
AsyncCheckpointRunnable

OperatorSnapshotFinalizer already waits and holds this future.
ChannelStateWriter.getWriteResult() can then be non-idempotent.
ChannelStateWriter.stop() can then be removed.
---
 .../checkpoint/channel/ChannelStateWriter.java | 21 ++--
 .../checkpoint/channel/ChannelStateWriterImpl.java | 10 ++--
 .../channel/ChannelStateWriterImplTest.java| 28 ++
 .../checkpoint/channel/MockChannelStateWriter.java |  6 +
 .../channel/RecordingChannelStateWriter.java   |  5 
 .../runtime/state/ChannelPersistenceITCase.java|  2 +-
 .../runtime/tasks/AsyncCheckpointRunnable.java |  6 -
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 15 +---
 .../runtime/tasks/LocalStateForwardingTest.java|  2 --
 9 files changed, 24 insertions(+), 71 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
index 5dad559..af2a708 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriter.java
@@ -123,7 +123,7 @@ public interface ChannelStateWriter extends Closeable {
 * Finalize write of channel state data for the given checkpoint id.
 * Must be called after {@link #start(long, CheckpointOptions)} and all 
of the input data of the given checkpoint added.
 * When both {@link #finishInput} and {@link #finishOutput} were called 
the results can be (eventually) obtained
-* using {@link #getWriteResult}
+* using {@link #getAndRemoveWriteResult}
 */
void finishInput(long checkpointId);
 
@@ -131,24 +131,21 @@ public interface ChannelStateWriter extends Closeable {
 * Finalize write of channel state data for the given checkpoint id.
 * Must be called after {@link #start(long, CheckpointOptions)} and all 
of the output data of the given checkpoint added.
 * When both {@link #finishInput} and {@link #finishOutput} were called 
the results can be (eventually) obtained
-* using {@link #getWriteResult}
+* using {@link #getAndRemoveWriteResult}
 */
void finishOutput(long checkpointId);
 
/**
 * Aborts the checkpoint and fails pending result for this checkpoint.
+* @param cleanup true if {@link #getAndRemoveWriteResult(long)} is not 
supposed to be called afterwards.
 */
void abort(long checkpointId, Throwable cause);
 
/**
-* Must be called after {@link #start(long, CheckpointOptions)}.
+* Must be called after {@link #start(long, CheckpointOptions)} once.
+* @throws IllegalArgumentException if the passed checkpointId is not 
known.
 */
-   ChannelStateWriteResult getWriteResult(long checkpointId);
-
-   /**
-* Cleans up the internal state for the given checkpoint.
-*/
-   void stop(long checkpointId);
+   ChannelStateWriteResult getAndRemoveWriteResult(long checkpointId) 
throws IllegalArgumentException;
 
ChannelStateWriter NO_OP = new NoOpChannelStateWriter();
 
@@ -181,16 +178,12 @@ public interface ChannelStateWriter extends Closeable {
}
 
@Override
-   public ChannelStateWriteResult getWriteResult(long 
checkpointId) {
+   public ChannelStateWriteResult getAndRemoveWriteResult(long 
checkpointId) {
return ChannelStateWriteResult.EMPTY;
}
 
@Override
public void close() {
}
-
-   @Override
-   public void stop(long checkpointId) {
-   }
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
index fc8655c..6158358 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/ChannelStateWriterImpl.java
@@ -153,19 +153,13 @@ public class ChannelStateWriterImpl implements 
ChannelStateWriter {
}
 
@Override
-   public ChannelStateWriteResult getWriteResult(long checkpointId) {
+   public ChannelStateWriteR

[flink] annotated tag release-1.11.0-rc1 updated (8aaea7f -> 80f862f)

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to annotated tag release-1.11.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.11.0-rc1 was modified! ***

from 8aaea7f  (commit)
  to 80f862f  (tag)
 tagging 8aaea7f3486e573f005b64ed0dcedb902139bf12 (commit)
 replaces pre-apache-rename
  by Zhijiang
  on Fri May 22 18:09:55 2020 +0200

- Log -
release-1.11.0-rc1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEELahbkyRP36GaYkRQBlPAos6gDQ4FAl7H+VMACgkQBlPAos6g
DQ4qSQ//Z6h3qBZcEr7WJj/7s0TK5/jeay7SopEUHFev1X1vKo78D7eV/2u4UqIo
fyvjmOtnqHiEIqqXNwl6lKx/uiH8oGP7nlRHvEBEk4NMX5TCuWH3BnWvkDuAgkAc
vTjOlF3AjF9E2Fmhj0mchBwggwSau2eRRxDEH3PhUqlLv4HecnchfZPwGT5VfU4Z
ykoQVOVU94n77XQPwiikEcSjRBXm1LjE6APnWrGUMpzD20ZmPb9MsHD8hjoWwVKJ
5X0uLjk7Mc3gzHUynFi6PCk9rvZsJMz6yJQWteEacfrlilf1V/YZHZzVRyD8g8v7
9SEwxtX9l/9Bh1yokd+F6zVj8UIUbO/5bt/JkEHa685OUeU8/OKcA2wDduuxf37Y
WqK/UrQ/rclRVX+g5AMkI1VoTWtzHeoLf+APjBxyi+yzFmuYbp0Mu2/0sSeSrWH5
58Rwi4zrHSmIO0vCZIX+rvvT6entb4Uz0z/wtYH7kZKmUF/7DCxwN1oZl0v0ZEjK
8tkC6YwfEB9GUHD78EbdsCGWxhyH2MQF6MhyVIAk8tjZUoma72nhZlbR9sNOUTPZ
DGoXqOKudw1AmfwR120OIHFH9O3FaM6sGxc5WZknKqL7mSzBJxhYEExL27VrHBYU
3wsJD9RPhKEVLjPYC8J2c5/d2t+2SEWhhPLpLm7V8UzqSeyuKcE=
=MdYG
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[flink] annotated tag release-1.11.0-rc1 updated (8aaea7f -> 80f862f)

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to annotated tag release-1.11.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.11.0-rc1 was modified! ***

from 8aaea7f  (commit)
  to 80f862f  (tag)
 tagging 8aaea7f3486e573f005b64ed0dcedb902139bf12 (commit)
 replaces pre-apache-rename
  by Zhijiang
  on Fri May 22 18:09:55 2020 +0200

- Log -
release-1.11.0-rc1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEELahbkyRP36GaYkRQBlPAos6gDQ4FAl7H+VMACgkQBlPAos6g
DQ4qSQ//Z6h3qBZcEr7WJj/7s0TK5/jeay7SopEUHFev1X1vKo78D7eV/2u4UqIo
fyvjmOtnqHiEIqqXNwl6lKx/uiH8oGP7nlRHvEBEk4NMX5TCuWH3BnWvkDuAgkAc
vTjOlF3AjF9E2Fmhj0mchBwggwSau2eRRxDEH3PhUqlLv4HecnchfZPwGT5VfU4Z
ykoQVOVU94n77XQPwiikEcSjRBXm1LjE6APnWrGUMpzD20ZmPb9MsHD8hjoWwVKJ
5X0uLjk7Mc3gzHUynFi6PCk9rvZsJMz6yJQWteEacfrlilf1V/YZHZzVRyD8g8v7
9SEwxtX9l/9Bh1yokd+F6zVj8UIUbO/5bt/JkEHa685OUeU8/OKcA2wDduuxf37Y
WqK/UrQ/rclRVX+g5AMkI1VoTWtzHeoLf+APjBxyi+yzFmuYbp0Mu2/0sSeSrWH5
58Rwi4zrHSmIO0vCZIX+rvvT6entb4Uz0z/wtYH7kZKmUF/7DCxwN1oZl0v0ZEjK
8tkC6YwfEB9GUHD78EbdsCGWxhyH2MQF6MhyVIAk8tjZUoma72nhZlbR9sNOUTPZ
DGoXqOKudw1AmfwR120OIHFH9O3FaM6sGxc5WZknKqL7mSzBJxhYEExL27VrHBYU
3wsJD9RPhKEVLjPYC8J2c5/d2t+2SEWhhPLpLm7V8UzqSeyuKcE=
=MdYG
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[flink] annotated tag release-1.11.0-rc1 updated (80f862f -> 156ad08b)

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to annotated tag release-1.11.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.11.0-rc1 was modified! ***

from 80f862f  (tag)
  to 156ad08b (tag)
 tagging 3314b26ea07b97f881a89317b241bbfdeae027ec (commit)
 replaces pre-apache-rename
  by Zhijiang Wang
  on Sun May 24 01:34:58 2020 +0800

- Log -
release-1.11.0-rc1
-BEGIN PGP SIGNATURE-
Version: GnuPG v2.0.22 (GNU/Linux)

iQIcBAABAgAGBQJeyV7CAAoJEAZTwKLOoA0ODNgP/3myMJC+BOiViH/sVGrAzJTi
LfVmLQL/++0qeD2Ug4/jCJ02qtq/eVITCwg7zSSNQCMZzMVPORfpLdeuDWKdAw/7
PA2RzOtS7tiOZftgF58BCcSv51D4Hm28YFpFBYc3NefxCaxmg5LYy55/GP9Ev/jE
89CAhppdNLsCSeybodcYzdefLouPSxR8qlZt77tayJ3O3fQDfe0xBKcX5rn/nGOo
oz4Fd+bHS+ROssoiOkCvu+dGHbpBStgcBRDzTfNQtd8zmOieGUHU1a6KLggsG3nq
Fc2RaiI2IYHQ3k9QI/wMJYAhf/OCIBGbR+psF2D3UjvkZAFMhfcjjTj4njm+jWhu
sO46abagR3+69UliFh5+fnxZmRbvjDHNNk6k7DX421KmKkMJBu+gxw38FxWUQdX6
qyUiMhUTXUFJSkHAz+NU8Rw44udGrBsCZiHdOILPh7zAWxAIMUUYdKh39YC8MB+s
xKJ6o14cBc4lGsLh12G+TFPMV5GphYPLK7P5V05XgFrlq1UspNCnTsWTO0YngQwD
eSr//I2vHYyTV/sBr4D2ejql92xw0bpTKMtman0sWz4wdtjT9RQBW4uFHfKwSeRw
H9TmBgoQKdU+Qaa4VWxeF+epCoT24YC6RJpRFWVj/3qVS6HPq6mEbD7243KRCHYT
S7SXEyVVGtB1qku6EvDL
=hqXK
-END PGP SIGNATURE-
---

 discard 8aaea7f  Commit for release 1.11.0
 add 828ba1d  [FLINK-17801][tests] Increase timeout of 
TaskExecutorTest.testHeartbeatTimeoutWithResourceManager
 add 1bf5b81  [hotfix][tests] Make ClientTest extend TestLogger
 add 81c185b  [hotfix][qs] Fix logging of exception in RequestWriteListener
 add 7cbdd91  [FLINK-13553][qs] Add logging to AbstractServerHandler
 add 3e18c10  [FLINK-13553][tests] Enable TRACE logging for 
org.apache.flink.queryablestate
 add 17d3f05  [FLINK-17854][core] Move InputStatus to flink-core
 add a0abaaf  [FLINK-17854][core] Let SourceReader use InputStatus directly
 add c408bb1  [hotfix][formatting] Fix the checkstyle issue of missing a 
javadoc comment in DummyNoOpOperator
 new 3314b26  Commit for release 1.11.0

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
annotated tag are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (80f862f)
\
 N -- N -- N   refs/tags/release-1.11.0-rc1 (156ad08b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/reader/SourceReaderBase.java   | 13 ++-
 .../base/source/reader/SourceReaderTestBase.java   |  3 ++-
 .../flink/api/connector/source/SourceReader.java   | 19 ---
 .../org/apache/flink/core}/io/InputStatus.java | 27 +++---
 .../connector/source/mocks/MockSourceReader.java   |  7 +++---
 .../network/AbstractServerHandler.java |  5 +++-
 .../flink/queryablestate/network/ClientTest.java   |  3 ++-
 .../runtime/taskexecutor/TaskExecutorTest.java |  2 +-
 .../streaming/api/operators/SourceOperator.java| 13 ++-
 .../runtime/io/MultipleInputSelectionHandler.java  |  1 +
 .../runtime/io/PushingAsyncDataInput.java  |  1 +
 .../streaming/runtime/io/StreamInputProcessor.java |  1 +
 .../runtime/io/StreamMultipleInputProcessor.java   |  1 +
 .../runtime/io/StreamOneInputProcessor.java|  1 +
 .../runtime/io/StreamTaskNetworkInput.java |  1 +
 .../runtime/io/StreamTaskSourceInput.java  |  1 +
 .../runtime/io/StreamTwoInputProcessor.java|  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/io/StreamTaskNetworkInputTest.java |  1 +
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|  2 +-
 .../apache/flink/table/util/DummyNoOpOperator.java |  3 +++
 tools/log4j-travis.properties  |  5 
 23 files changed, 65 insertions(+), 50 deletions(-)
 rename {flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime 
=> flink-core/src/main/java/org/apache/flink/core}/io/InputStatus.java (52%)



[flink] 01/01: Commit for release 1.11.0

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to annotated tag release-1.11.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3314b26ea07b97f881a89317b241bbfdeae027ec
Author: Zhijiang Wang 
AuthorDate: Sun May 24 01:33:12 2020 +0800

Commit for release 1.11.0
---
 docs/_config.yml  | 2 +-
 flink-annotations/pom.xml | 2 +-
 flink-clients/pom.xml | 2 +-
 flink-connectors/flink-connector-base/pom.xml | 2 +-
 flink-connectors/flink-connector-cassandra/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch5/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-connector-filesystem/pom.xml   | 2 +-
 flink-connectors/flink-connector-gcp-pubsub/pom.xml   | 2 +-
 flink-connectors/flink-connector-hbase/pom.xml| 2 +-
 flink-connectors/flink-connector-hive/pom.xml | 2 +-
 flink-connectors/flink-connector-jdbc/pom.xml | 2 +-
 flink-connectors/flink-connector-kafka-0.10/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-0.11/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-connector-nifi/pom.xml | 2 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/flink-connector-twitter/pom.xml  | 2 +-
 flink-connectors/flink-hadoop-compatibility/pom.xml   | 2 +-
 flink-connectors/flink-hcatalog/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-1.2.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.3.6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka-0.10/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka-0.11/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +-
 flink-connectors/pom.xml  | 2 +-
 flink-container/pom.xml   | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml   | 2 +-
 flink-contrib/pom.xml | 2 +-
 flink-core/pom.xml| 2 +-
 flink-dist/pom.xml| 2 +-
 flink-docs/pom.xml| 2 +-
 flink-end-to-end-tests/flink-batch-sql-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +-
 .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +-
 flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +-
 .../flink-dataset-fine-grained-recovery-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml| 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-high-parallelism-iterations-test/pom.xml | 2 +-
 .../flink-local-recovery-and-allocation-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-metrics-availability-test/pom.xml| 2 +-
 flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml | 2 +-
 .../flink-netty-shuffle-memory-control-test/pom.xml

[flink] annotated tag release-1.11.0-rc1 updated (80f862f -> 156ad08b)

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to annotated tag release-1.11.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.11.0-rc1 was modified! ***

from 80f862f  (tag)
  to 156ad08b (tag)
 tagging 3314b26ea07b97f881a89317b241bbfdeae027ec (commit)
 replaces pre-apache-rename
  by Zhijiang Wang
  on Sun May 24 01:34:58 2020 +0800

- Log -
release-1.11.0-rc1
-BEGIN PGP SIGNATURE-
Version: GnuPG v2.0.22 (GNU/Linux)

iQIcBAABAgAGBQJeyV7CAAoJEAZTwKLOoA0ODNgP/3myMJC+BOiViH/sVGrAzJTi
LfVmLQL/++0qeD2Ug4/jCJ02qtq/eVITCwg7zSSNQCMZzMVPORfpLdeuDWKdAw/7
PA2RzOtS7tiOZftgF58BCcSv51D4Hm28YFpFBYc3NefxCaxmg5LYy55/GP9Ev/jE
89CAhppdNLsCSeybodcYzdefLouPSxR8qlZt77tayJ3O3fQDfe0xBKcX5rn/nGOo
oz4Fd+bHS+ROssoiOkCvu+dGHbpBStgcBRDzTfNQtd8zmOieGUHU1a6KLggsG3nq
Fc2RaiI2IYHQ3k9QI/wMJYAhf/OCIBGbR+psF2D3UjvkZAFMhfcjjTj4njm+jWhu
sO46abagR3+69UliFh5+fnxZmRbvjDHNNk6k7DX421KmKkMJBu+gxw38FxWUQdX6
qyUiMhUTXUFJSkHAz+NU8Rw44udGrBsCZiHdOILPh7zAWxAIMUUYdKh39YC8MB+s
xKJ6o14cBc4lGsLh12G+TFPMV5GphYPLK7P5V05XgFrlq1UspNCnTsWTO0YngQwD
eSr//I2vHYyTV/sBr4D2ejql92xw0bpTKMtman0sWz4wdtjT9RQBW4uFHfKwSeRw
H9TmBgoQKdU+Qaa4VWxeF+epCoT24YC6RJpRFWVj/3qVS6HPq6mEbD7243KRCHYT
S7SXEyVVGtB1qku6EvDL
=hqXK
-END PGP SIGNATURE-
---

 discard 8aaea7f  Commit for release 1.11.0
 add 828ba1d  [FLINK-17801][tests] Increase timeout of 
TaskExecutorTest.testHeartbeatTimeoutWithResourceManager
 add 1bf5b81  [hotfix][tests] Make ClientTest extend TestLogger
 add 81c185b  [hotfix][qs] Fix logging of exception in RequestWriteListener
 add 7cbdd91  [FLINK-13553][qs] Add logging to AbstractServerHandler
 add 3e18c10  [FLINK-13553][tests] Enable TRACE logging for 
org.apache.flink.queryablestate
 add 17d3f05  [FLINK-17854][core] Move InputStatus to flink-core
 add a0abaaf  [FLINK-17854][core] Let SourceReader use InputStatus directly
 add c408bb1  [hotfix][formatting] Fix the checkstyle issue of missing a 
javadoc comment in DummyNoOpOperator
 new 3314b26  Commit for release 1.11.0

This update added new revisions after undoing existing revisions.
That is to say, some revisions that were in the old version of the
annotated tag are not in the new version.  This situation occurs
when a user --force pushes a change and generates a repository
containing something like this:

 * -- * -- B -- O -- O -- O   (80f862f)
\
 N -- N -- N   refs/tags/release-1.11.0-rc1 (156ad08b)

You should already have received notification emails for all of the O
revisions, and so the following emails describe only the N revisions
from the common base, B.

Any revisions marked "omit" are not gone; other references still
refer to them.  Any revisions marked "discard" are gone forever.

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../base/source/reader/SourceReaderBase.java   | 13 ++-
 .../base/source/reader/SourceReaderTestBase.java   |  3 ++-
 .../flink/api/connector/source/SourceReader.java   | 19 ---
 .../org/apache/flink/core}/io/InputStatus.java | 27 +++---
 .../connector/source/mocks/MockSourceReader.java   |  7 +++---
 .../network/AbstractServerHandler.java |  5 +++-
 .../flink/queryablestate/network/ClientTest.java   |  3 ++-
 .../runtime/taskexecutor/TaskExecutorTest.java |  2 +-
 .../streaming/api/operators/SourceOperator.java| 13 ++-
 .../runtime/io/MultipleInputSelectionHandler.java  |  1 +
 .../runtime/io/PushingAsyncDataInput.java  |  1 +
 .../streaming/runtime/io/StreamInputProcessor.java |  1 +
 .../runtime/io/StreamMultipleInputProcessor.java   |  1 +
 .../runtime/io/StreamOneInputProcessor.java|  1 +
 .../runtime/io/StreamTaskNetworkInput.java |  1 +
 .../runtime/io/StreamTaskSourceInput.java  |  1 +
 .../runtime/io/StreamTwoInputProcessor.java|  1 +
 .../flink/streaming/runtime/tasks/StreamTask.java  |  2 +-
 .../runtime/io/StreamTaskNetworkInputTest.java |  1 +
 .../runtime/tasks/MultipleInputStreamTaskTest.java |  2 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|  2 +-
 .../apache/flink/table/util/DummyNoOpOperator.java |  3 +++
 tools/log4j-travis.properties  |  5 
 23 files changed, 65 insertions(+), 50 deletions(-)
 rename {flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime 
=> flink-core/src/main/java/org/apache/flink/core}/io/InputStatus.java (52%)



[flink] 01/01: Commit for release 1.11.0

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to annotated tag release-1.11.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 3314b26ea07b97f881a89317b241bbfdeae027ec
Author: Zhijiang Wang 
AuthorDate: Sun May 24 01:33:12 2020 +0800

Commit for release 1.11.0
---
 docs/_config.yml  | 2 +-
 flink-annotations/pom.xml | 2 +-
 flink-clients/pom.xml | 2 +-
 flink-connectors/flink-connector-base/pom.xml | 2 +-
 flink-connectors/flink-connector-cassandra/pom.xml| 2 +-
 flink-connectors/flink-connector-elasticsearch-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch5/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-connector-filesystem/pom.xml   | 2 +-
 flink-connectors/flink-connector-gcp-pubsub/pom.xml   | 2 +-
 flink-connectors/flink-connector-hbase/pom.xml| 2 +-
 flink-connectors/flink-connector-hive/pom.xml | 2 +-
 flink-connectors/flink-connector-jdbc/pom.xml | 2 +-
 flink-connectors/flink-connector-kafka-0.10/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-0.11/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka-base/pom.xml   | 2 +-
 flink-connectors/flink-connector-kafka/pom.xml| 2 +-
 flink-connectors/flink-connector-kinesis/pom.xml  | 2 +-
 flink-connectors/flink-connector-nifi/pom.xml | 2 +-
 flink-connectors/flink-connector-rabbitmq/pom.xml | 2 +-
 flink-connectors/flink-connector-twitter/pom.xml  | 2 +-
 flink-connectors/flink-hadoop-compatibility/pom.xml   | 2 +-
 flink-connectors/flink-hcatalog/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-elasticsearch7/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-1.2.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.2.0/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-2.3.6/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-hive-3.1.2/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka-0.10/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka-0.11/pom.xml   | 2 +-
 flink-connectors/flink-sql-connector-kafka/pom.xml| 2 +-
 flink-connectors/pom.xml  | 2 +-
 flink-container/pom.xml   | 2 +-
 flink-contrib/flink-connector-wikiedits/pom.xml   | 2 +-
 flink-contrib/pom.xml | 2 +-
 flink-core/pom.xml| 2 +-
 flink-dist/pom.xml| 2 +-
 flink-docs/pom.xml| 2 +-
 flink-end-to-end-tests/flink-batch-sql-test/pom.xml   | 2 +-
 flink-end-to-end-tests/flink-bucketing-sink-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-cli-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-confluent-schema-registry/pom.xml| 2 +-
 .../flink-connector-gcp-pubsub-emulator-tests/pom.xml | 2 +-
 flink-end-to-end-tests/flink-dataset-allround-test/pom.xml| 2 +-
 .../flink-dataset-fine-grained-recovery-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-datastream-allround-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-distributed-cache-via-blob-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch5-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch6-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-elasticsearch7-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/pom.xml| 2 +-
 flink-end-to-end-tests/flink-end-to-end-tests-common/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-heavy-deployment-stress-test/pom.xml | 2 +-
 flink-end-to-end-tests/flink-high-parallelism-iterations-test/pom.xml | 2 +-
 .../flink-local-recovery-and-allocation-test/pom.xml  | 2 +-
 flink-end-to-end-tests/flink-metrics-availability-test/pom.xml| 2 +-
 flink-end-to-end-tests/flink-metrics-reporter-prometheus-test/pom.xml | 2 +-
 .../flink-netty-shuffle-memory-control-test/pom.xml

[flink] branch master updated (6744033 -> f88d98d)

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 6744033  [FLINK-17788][scala-shell] Fix yarn session support in scala 
shell
 add f88d98d  [FLINK-17182][network][tests] Fix the unstable 
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle

No new revisions were added by this update.

Summary of changes:
 .../partition/consumer/RemoteInputChannelTest.java | 63 +++---
 1 file changed, 32 insertions(+), 31 deletions(-)



[flink] branch release-1.11 updated: [FLINK-17182][network][tests] Fix the unstable RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle

2020-06-11 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 22098b2  [FLINK-17182][network][tests] Fix the unstable 
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle
22098b2 is described below

commit 22098b27c32342f3ef74848a86d4b4d8d3c1dfb8
Author: Yun Gao 
AuthorDate: Fri Jun 12 10:52:08 2020 +0800

[FLINK-17182][network][tests] Fix the unstable 
RemoteInputChannelTest.testConcurrentOnSenderBacklogAndRecycle

In this unstable unit test, the exclusive buffers and floating buffers are 
recycled by different
threads, which might cause unexpected race condition issue. But actually 
they should always be
recycled by the same task thread in practice. So we refactor the test 
process to recycle them in
the same thread to avoid potential unnecessary issues.

This closes #11924.
---
 .../partition/consumer/RemoteInputChannelTest.java | 63 +++---
 1 file changed, 32 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
index b280422..3984dde 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java
@@ -59,6 +59,8 @@ import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
+import java.util.Queue;
+import java.util.Random;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
@@ -862,8 +864,7 @@ public class RemoteInputChannelTest {
 
// Submit tasks and wait to finish
submitTasksAndWaitForResults(executor, new Callable[]{
-   recycleExclusiveBufferTask(inputChannel, 
numExclusiveSegments),
-   recycleFloatingBufferTask(bufferPool, 
numFloatingBuffers),
+   recycleBufferTask(inputChannel, bufferPool, 
numExclusiveSegments, numFloatingBuffers),
requestBufferTask});
 
assertEquals("There should be " + 
inputChannel.getNumberOfRequiredBuffers() + " buffers available in channel.",
@@ -912,8 +913,7 @@ public class RemoteInputChannelTest {
 
// Submit tasks and wait to finish
submitTasksAndWaitForResults(executor, new Callable[]{
-   recycleExclusiveBufferTask(inputChannel, 
numExclusiveSegments),
-   recycleFloatingBufferTask(bufferPool, 
numFloatingBuffers),
+   recycleBufferTask(inputChannel, bufferPool, 
numExclusiveSegments, numFloatingBuffers),
releaseTask});
 
assertEquals("There should be no buffers available in 
the channel.",
@@ -1222,14 +1222,21 @@ public class RemoteInputChannelTest {
}
 
/**
-* Requests the exclusive buffers from input channel first and then 
recycles them by a callable task.
+* Requests the buffers from input channel and buffer pool first and 
then recycles them by a callable task.
 *
 * @param inputChannel The input channel that exclusive buffers request 
from.
+* @param bufferPool The buffer pool that floating buffers request from.
 * @param numExclusiveSegments The number of exclusive buffers to 
request.
-* @return The callable task to recycle exclusive buffers.
+* @param numFloatingBuffers The number of floating buffers to request.
+* @return The callable task to recycle exclusive and floating buffers.
 */
-   private Callable recycleExclusiveBufferTask(RemoteInputChannel 
inputChannel, int numExclusiveSegments) {
-   final List exclusiveBuffers = new 
ArrayList<>(numExclusiveSegments);
+   private Callable recycleBufferTask(
+   RemoteInputChannel inputChannel,
+   BufferPool bufferPool,
+   int numExclusiveSegments,
+   int numFloatingBuffers) throws Exception {
+
+   Queue exclusiveBuffers = new 
ArrayDeque<>(numExclusiveSegments);
// Exhaust all the exclusive buffers
for (int i = 0; i < numExclusiveSegments; i++) {
Buffer buffer = inputChannel.requestBuffer();
@@ -1237,27 +1244,7 @@ public class RemoteInputChannelTest {
  

[flink] branch master updated: [FLINK-18252][checkpointing] Fix savepoint overtaking output data.

2020-06-12 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new dd3717e  [FLINK-18252][checkpointing] Fix savepoint overtaking output 
data.
dd3717e is described below

commit dd3717e8383c39eb086f21fba79db0381dfac248
Author: Arvid Heise 
AuthorDate: Thu Jun 11 21:18:11 2020 +0200

[FLINK-18252][checkpointing] Fix savepoint overtaking output data.

Currently, a checkpoint/savepoint barrier is always send as a priority 
events to the output partitions, where it overtakes data. After the fix a 
barrier is only a priority event iff it's unaligned.
Also CheckpointCoordinator only set unaligned flag if the barrier belongs 
to a checkpoint.
Ultimately, the unaligned checkpoint config option is not used by 
SubtaskCheckpointCoordinatorImpl except for initializing the channel state 
writer. The source of truth is now the CheckpointOptions.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java| 16 +++--
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 40 +++---
 3 files changed, 41 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..735c343 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -732,7 +732,7 @@ public class CheckpointCoordinator {
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference(),
isExactlyOnceMode,
-   unalignedCheckpointsEnabled);
+   props.getCheckpointType() == CheckpointType.CHECKPOINT 
&& unalignedCheckpointsEnabled);
 
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index d904c87..b2fe147 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -82,7 +82,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
private final AsyncExceptionHandler asyncExceptionHandler;
private final ChannelStateWriter channelStateWriter;
private final StreamTaskActionExecutor actionExecutor;
-   private final boolean unalignedCheckpointEnabled;
private final BiFunctionWithException, IOException> prepareInputSnapshot;
/** The IDs of the checkpoint for which we are notified aborted. */
private final Set abortedCheckpointIds;
@@ -139,7 +138,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
executorService,
env,
asyncExceptionHandler,
-   unalignedCheckpointEnabled,
prepareInputSnapshot,
maxRecordAbortedCheckpoints,
unalignedCheckpointEnabled ? 
openChannelStateWriter(taskName, checkpointStorage) : ChannelStateWriter.NO_OP);
@@ -154,7 +152,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
ExecutorService executorService,
Environment env,
AsyncExceptionHandler asyncExceptionHandler,
-   boolean unalignedCheckpointEnabled,
BiFunctionWithException, IOException> prepareInputSnapshot,
int maxRecordAbortedCheckpoints,
ChannelStateWriter channelStateWriter) throws 
IOException {
@@ -167,7 +164,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
this.asyncExceptionHandler = 
checkNotNull(asyncExceptionHandler);
this.actionExecutor = checkNotNull(actionExecutor);
this.channelStateWriter = checkNotNull(channelStateWriter);
-   this.unalignedCheckpointEnabled = unalignedCheckpointEnabled;
this.prepareInputSnapshot = prepareInputSnapshot;
this.abortedCheckpointIds = 
createAbortedCheckpointSetWithLimitSize(maxRecor

[flink] branch master updated (4d8f55e -> 7e7b132)

2020-06-12 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 4d8f55e  [FLINK-18259][tests] Increase heartbeat timeouts for 
HeartbeatManagerTest
 add ca9dc71  [FLINK-17322][network] Fixes BroadcastRecordWriter 
overwriting memory segments on first finished BufferConsumer.
 add 7e7b132  [FLINK-17322][network] Disallowing repeated consumer creation 
for BufferBuilder.

No new revisions were added by this update.

Summary of changes:
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../io/network/api/writer/RecordWriter.java|  5 ++
 .../runtime/io/network/buffer/BufferBuilder.java   | 10 ++-
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../api/writer/RecordWriterDelegateTest.java   | 10 ++-
 .../io/network/api/writer/RecordWriterTest.java| 13 ++-
 .../buffer/BufferBuilderAndConsumerTest.java   | 16 +---
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 9 files changed, 199 insertions(+), 22 deletions(-)
 create mode 100644 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java



[flink] branch release-1.11 updated (89c224e -> dfdfdad)

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 89c224e  [FLINK-18265][fs-connector] temp path in 
FileSystemOutputFormat should be deleted
 new aaa3bbc  [FLINK-17322][network] Fixes BroadcastRecordWriter 
overwriting memory segments on first finished BufferConsumer.
 new dfdfdad  [FLINK-17322][network] Disallowing repeated consumer creation 
for BufferBuilder.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../io/network/api/writer/RecordWriter.java|  5 ++
 .../runtime/io/network/buffer/BufferBuilder.java   | 10 ++-
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../api/writer/RecordWriterDelegateTest.java   | 10 ++-
 .../io/network/api/writer/RecordWriterTest.java| 13 ++-
 .../buffer/BufferBuilderAndConsumerTest.java   | 16 +---
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 9 files changed, 199 insertions(+), 22 deletions(-)
 create mode 100644 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java



[flink] 01/02: [FLINK-17322][network] Fixes BroadcastRecordWriter overwriting memory segments on first finished BufferConsumer.

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aaa3bbc8495474aea03b2e5184e5a15296ed8809
Author: Arvid Heise 
AuthorDate: Wed Jun 3 13:48:49 2020 +0200

[FLINK-17322][network] Fixes BroadcastRecordWriter overwriting memory 
segments on first finished BufferConsumer.

BroadcastRecordWriter#randomEmit initialized buffer consumers for other 
non-target channels incorrectly leading to separate buffer reference counting 
and subsequently released buffers too early.
This commit uses the new BufferConsumer#copyWithReaderPosition method to 
copy the buffer while updating the read index to the last committed write index 
of the builder.
---
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../runtime/io/network/buffer/BufferBuilder.java   |  6 +-
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../io/network/api/writer/RecordWriterTest.java|  8 +-
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 6 files changed, 176 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 9964b20..132fefa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -50,6 +50,8 @@ public final class BroadcastRecordWriter extends R
 */
private boolean randomTriggered;
 
+   private BufferConsumer randomTriggeredConsumer;
+
BroadcastRecordWriter(
ResultPartitionWriter writer,
long timeout,
@@ -84,7 +86,7 @@ public final class BroadcastRecordWriter extends R
if (bufferBuilder != null) {
for (int index = 0; index < numberOfChannels; index++) {
if (index != targetChannelIndex) {
-   
addBufferConsumer(bufferBuilder.createBufferConsumer(), index);
+   
addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()),
 index);
}
}
}
@@ -130,7 +132,7 @@ public final class BroadcastRecordWriter extends R
 
BufferBuilder builder = 
super.requestNewBufferBuilder(targetChannel);
if (randomTriggered) {
-   addBufferConsumer(builder.createBufferConsumer(), 
targetChannel);
+   addBufferConsumer(randomTriggeredConsumer = 
builder.createBufferConsumer(), targetChannel);
} else {
try (BufferConsumer bufferConsumer = 
builder.createBufferConsumer()) {
for (int channel = 0; channel < 
numberOfChannels; channel++) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index b18569e..7780ba8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -123,6 +123,10 @@ public class BufferBuilder {
return getMaxCapacity() - positionMarker.getCached();
}
 
+   public int getCommittedBytes() {
+   return positionMarker.getCached();
+   }
+
public int getMaxCapacity() {
return memorySegment.size();
}
@@ -167,7 +171,7 @@ public class BufferBuilder {
 *
 * Remember to commit the {@link SettablePositionMarker} to make the 
changes visible.
 */
-   private static class SettablePositionMarker implements PositionMarker {
+   static class SettablePositionMarker implements PositionMarker {
private volatile int position = 0;
 
/**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 863b231..70c8457 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -25,6 +25,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Closeable;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import 

[flink] 02/02: [FLINK-17322][network] Disallowing repeated consumer creation for BufferBuilder.

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfdfdadf445ea055c841c526b1a382424e1e1865
Author: Arvid Heise 
AuthorDate: Fri Jun 12 10:03:59 2020 +0200

[FLINK-17322][network] Disallowing repeated consumer creation for 
BufferBuilder.

This is a partial revert of FLINK-10995.
---
 .../runtime/io/network/api/writer/RecordWriter.java  |  5 +
 .../flink/runtime/io/network/buffer/BufferBuilder.java   |  4 
 .../io/network/api/writer/RecordWriterDelegateTest.java  | 10 --
 .../runtime/io/network/api/writer/RecordWriterTest.java  |  5 +++--
 .../io/network/buffer/BufferBuilderAndConsumerTest.java  | 16 +++-
 5 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 7d0f7de..be40d8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -349,4 +349,9 @@ public abstract class RecordWriter implements Avai
}
}
}
+
+   @VisibleForTesting
+   ResultPartitionWriter getTargetPartition() {
+   return targetPartition;
+   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 7780ba8..2cb873c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -41,6 +41,8 @@ public class BufferBuilder {
 
private final SettablePositionMarker positionMarker = new 
SettablePositionMarker();
 
+   private boolean bufferConsumerCreated = false;
+
public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
this.memorySegment = checkNotNull(memorySegment);
this.recycler = checkNotNull(recycler);
@@ -53,6 +55,8 @@ public class BufferBuilder {
 * @return created matching instance of {@link BufferConsumer} to this 
{@link BufferBuilder}.
 */
public BufferConsumer createBufferConsumer() {
+   checkState(!bufferConsumerCreated, "Two BufferConsumer 
shouldn't exist for one BufferBuilder");
+   bufferConsumerCreated = true;
return new BufferConsumer(
memorySegment,
recycler,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 97d8053..4a7e5c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -25,7 +25,9 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.util.TestLogger;
@@ -148,13 +150,17 @@ public class RecordWriterDelegateTest extends TestLogger {
assertTrue(writerDelegate.getAvailableFuture().isDone());
 
// request one buffer from the local pool to make it unavailable
-   final BufferBuilder bufferBuilder = 
checkNotNull(writerDelegate.getRecordWriter(0).getBufferBuilder(0));
+   RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
+   final BufferBuilder bufferBuilder = 
checkNotNull(recordWriter.getBufferBuilder(0));
assertFalse(writerDelegate.isAvailable());
CompletableFuture future = writerDelegate.getAvailableFuture();
assertFalse(future.isDone());
 
// recycle the buffer to make the local pool available again
-   final Buffer buffer = 
BufferBuilderTestUtils.buildSingleBuffe

[flink] 01/02: [FLINK-17322][network] Fixes BroadcastRecordWriter overwriting memory segments on first finished BufferConsumer.

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aaa3bbc8495474aea03b2e5184e5a15296ed8809
Author: Arvid Heise 
AuthorDate: Wed Jun 3 13:48:49 2020 +0200

[FLINK-17322][network] Fixes BroadcastRecordWriter overwriting memory 
segments on first finished BufferConsumer.

BroadcastRecordWriter#randomEmit initialized buffer consumers for other 
non-target channels incorrectly leading to separate buffer reference counting 
and subsequently released buffers too early.
This commit uses the new BufferConsumer#copyWithReaderPosition method to 
copy the buffer while updating the read index to the last committed write index 
of the builder.
---
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../runtime/io/network/buffer/BufferBuilder.java   |  6 +-
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../io/network/api/writer/RecordWriterTest.java|  8 +-
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 6 files changed, 176 insertions(+), 5 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
index 9964b20..132fefa 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/BroadcastRecordWriter.java
@@ -50,6 +50,8 @@ public final class BroadcastRecordWriter extends R
 */
private boolean randomTriggered;
 
+   private BufferConsumer randomTriggeredConsumer;
+
BroadcastRecordWriter(
ResultPartitionWriter writer,
long timeout,
@@ -84,7 +86,7 @@ public final class BroadcastRecordWriter extends R
if (bufferBuilder != null) {
for (int index = 0; index < numberOfChannels; index++) {
if (index != targetChannelIndex) {
-   
addBufferConsumer(bufferBuilder.createBufferConsumer(), index);
+   
addBufferConsumer(randomTriggeredConsumer.copyWithReaderPosition(bufferBuilder.getCommittedBytes()),
 index);
}
}
}
@@ -130,7 +132,7 @@ public final class BroadcastRecordWriter extends R
 
BufferBuilder builder = 
super.requestNewBufferBuilder(targetChannel);
if (randomTriggered) {
-   addBufferConsumer(builder.createBufferConsumer(), 
targetChannel);
+   addBufferConsumer(randomTriggeredConsumer = 
builder.createBufferConsumer(), targetChannel);
} else {
try (BufferConsumer bufferConsumer = 
builder.createBufferConsumer()) {
for (int channel = 0; channel < 
numberOfChannels; channel++) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index b18569e..7780ba8 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -123,6 +123,10 @@ public class BufferBuilder {
return getMaxCapacity() - positionMarker.getCached();
}
 
+   public int getCommittedBytes() {
+   return positionMarker.getCached();
+   }
+
public int getMaxCapacity() {
return memorySegment.size();
}
@@ -167,7 +171,7 @@ public class BufferBuilder {
 *
 * Remember to commit the {@link SettablePositionMarker} to make the 
changes visible.
 */
-   private static class SettablePositionMarker implements PositionMarker {
+   static class SettablePositionMarker implements PositionMarker {
private volatile int position = 0;
 
/**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
index 863b231..70c8457 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferConsumer.java
@@ -25,6 +25,7 @@ import javax.annotation.concurrent.NotThreadSafe;
 
 import java.io.Closeable;
 
+import static org.apache.flink.util.Preconditions.checkArgument;
 import 

[flink] 02/02: [FLINK-17322][network] Disallowing repeated consumer creation for BufferBuilder.

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit dfdfdadf445ea055c841c526b1a382424e1e1865
Author: Arvid Heise 
AuthorDate: Fri Jun 12 10:03:59 2020 +0200

[FLINK-17322][network] Disallowing repeated consumer creation for 
BufferBuilder.

This is a partial revert of FLINK-10995.
---
 .../runtime/io/network/api/writer/RecordWriter.java  |  5 +
 .../flink/runtime/io/network/buffer/BufferBuilder.java   |  4 
 .../io/network/api/writer/RecordWriterDelegateTest.java  | 10 --
 .../runtime/io/network/api/writer/RecordWriterTest.java  |  5 +++--
 .../io/network/buffer/BufferBuilderAndConsumerTest.java  | 16 +++-
 5 files changed, 23 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
index 7d0f7de..be40d8d 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
@@ -349,4 +349,9 @@ public abstract class RecordWriter implements Avai
}
}
}
+
+   @VisibleForTesting
+   ResultPartitionWriter getTargetPartition() {
+   return targetPartition;
+   }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
index 7780ba8..2cb873c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferBuilder.java
@@ -41,6 +41,8 @@ public class BufferBuilder {
 
private final SettablePositionMarker positionMarker = new 
SettablePositionMarker();
 
+   private boolean bufferConsumerCreated = false;
+
public BufferBuilder(MemorySegment memorySegment, BufferRecycler 
recycler) {
this.memorySegment = checkNotNull(memorySegment);
this.recycler = checkNotNull(recycler);
@@ -53,6 +55,8 @@ public class BufferBuilder {
 * @return created matching instance of {@link BufferConsumer} to this 
{@link BufferBuilder}.
 */
public BufferConsumer createBufferConsumer() {
+   checkState(!bufferConsumerCreated, "Two BufferConsumer 
shouldn't exist for one BufferBuilder");
+   bufferConsumerCreated = true;
return new BufferConsumer(
memorySegment,
recycler,
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
index 97d8053..4a7e5c5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterDelegateTest.java
@@ -25,7 +25,9 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilderTestUtils;
 import org.apache.flink.runtime.io.network.buffer.BufferConsumer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import 
org.apache.flink.runtime.io.network.partition.NoOpBufferAvailablityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionBuilder;
+import org.apache.flink.runtime.io.network.partition.ResultSubpartitionView;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.util.TestLogger;
@@ -148,13 +150,17 @@ public class RecordWriterDelegateTest extends TestLogger {
assertTrue(writerDelegate.getAvailableFuture().isDone());
 
// request one buffer from the local pool to make it unavailable
-   final BufferBuilder bufferBuilder = 
checkNotNull(writerDelegate.getRecordWriter(0).getBufferBuilder(0));
+   RecordWriter recordWriter = writerDelegate.getRecordWriter(0);
+   final BufferBuilder bufferBuilder = 
checkNotNull(recordWriter.getBufferBuilder(0));
assertFalse(writerDelegate.isAvailable());
CompletableFuture future = writerDelegate.getAvailableFuture();
assertFalse(future.isDone());
 
// recycle the buffer to make the local pool available again
-   final Buffer buffer = 
BufferBuilderTestUtils.buildSingleBuffe

[flink] branch release-1.11 updated (89c224e -> dfdfdad)

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 89c224e  [FLINK-18265][fs-connector] temp path in 
FileSystemOutputFormat should be deleted
 new aaa3bbc  [FLINK-17322][network] Fixes BroadcastRecordWriter 
overwriting memory segments on first finished BufferConsumer.
 new dfdfdad  [FLINK-17322][network] Disallowing repeated consumer creation 
for BufferBuilder.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../io/network/api/writer/RecordWriter.java|  5 ++
 .../runtime/io/network/buffer/BufferBuilder.java   | 10 ++-
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../api/writer/RecordWriterDelegateTest.java   | 10 ++-
 .../io/network/api/writer/RecordWriterTest.java| 13 ++-
 .../buffer/BufferBuilderAndConsumerTest.java   | 16 +---
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 9 files changed, 199 insertions(+), 22 deletions(-)
 create mode 100644 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java



[flink] branch master updated (57992c9 -> bb2affb)

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 57992c9  [FLINK-17623][elasticsearch] Support user resource cleanup in 
ElasticsearchSinkFunction
 add bb2affb  [FLINK-18089][network][tests] Config the e2e for netty 
shuffle memory control into azure pipeline

No new revisions were added by this update.

Summary of changes:
 flink-end-to-end-tests/run-nightly-tests.sh | 2 ++
 1 file changed, 2 insertions(+)



[flink] branch master updated: [FLINK-18089][network][tests] Config the e2e for netty shuffle memory control into azure pipeline

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new bb2affb  [FLINK-18089][network][tests] Config the e2e for netty 
shuffle memory control into azure pipeline
bb2affb is described below

commit bb2affbcb6ea748d5a8e8a4af188d4b6f21c1466
Author: Yun Gao 
AuthorDate: Mon Jun 15 12:29:52 2020 +0800

[FLINK-18089][network][tests] Config the e2e for netty shuffle memory 
control into azure pipeline

This closes #12614.
---
 flink-end-to-end-tests/run-nightly-tests.sh | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 69a2ec3..21ea06e 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -181,6 +181,8 @@ run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/tes
 run_test "Streaming File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" 
"skip_check_exceptions"
 run_test "Stateful stream job upgrade end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
+run_test "Netty shuffle direct memory consumption end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
+
 run_test "Elasticsearch (v5.3.3) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.3.3.tar.gz";
 run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
 



[flink] branch release-1.11 updated: [FLINK-18089][network][tests] Config the e2e for netty shuffle memory control into azure pipeline

2020-06-14 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 75175d2  [FLINK-18089][network][tests] Config the e2e for netty 
shuffle memory control into azure pipeline
75175d2 is described below

commit 75175d2e5a20e09ec31261e9e098b0e1e0d52a16
Author: Yun Gao 
AuthorDate: Mon Jun 15 13:48:55 2020 +0800

[FLINK-18089][network][tests] Config the e2e for netty shuffle memory 
control into azure pipeline
---
 flink-end-to-end-tests/run-nightly-tests.sh | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/flink-end-to-end-tests/run-nightly-tests.sh 
b/flink-end-to-end-tests/run-nightly-tests.sh
index 3a3471c..04422c7 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -181,6 +181,8 @@ run_test "Streaming File Sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/tes
 run_test "Streaming File Sink s3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3" 
"skip_check_exceptions"
 run_test "Stateful stream job upgrade end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
 
+run_test "Netty shuffle direct memory consumption end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
+
 run_test "Elasticsearch (v5.3.3) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.3.3.tar.gz";
 run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
 



[flink] branch release-1.10 updated (2bf5ea2 -> 5c0f827)

2020-06-16 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2bf5ea2  [FLINK-17891][yarn] Set execution.target=yarn-session in 
FlinkYarnSessionCli.run()
 add 5fc4d1b  [FLINK-17322][network] Fixes BroadcastRecordWriter 
overwriting memory segments on first finished BufferConsumer.
 add 5c0f827  [FLINK-17322][network] Disallowing repeated consumer creation 
for BufferBuilder.

No new revisions were added by this update.

Summary of changes:
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../io/network/api/writer/RecordWriter.java|  5 ++
 .../runtime/io/network/buffer/BufferBuilder.java   |  8 ++
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../api/writer/RecordWriterDelegateTest.java   | 12 ++-
 .../io/network/api/writer/RecordWriterTest.java| 73 -
 .../buffer/BufferBuilderAndConsumerTest.java   | 16 +---
 .../partition/NoOpBufferAvailablityListener.java   |  2 +-
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 10 files changed, 264 insertions(+), 19 deletions(-)
 create mode 100644 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java



[flink] branch release-1.10 updated (2bf5ea2 -> 5c0f827)

2020-06-16 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 2bf5ea2  [FLINK-17891][yarn] Set execution.target=yarn-session in 
FlinkYarnSessionCli.run()
 add 5fc4d1b  [FLINK-17322][network] Fixes BroadcastRecordWriter 
overwriting memory segments on first finished BufferConsumer.
 add 5c0f827  [FLINK-17322][network] Disallowing repeated consumer creation 
for BufferBuilder.

No new revisions were added by this update.

Summary of changes:
 .../network/api/writer/BroadcastRecordWriter.java  |  6 +-
 .../io/network/api/writer/RecordWriter.java|  5 ++
 .../runtime/io/network/buffer/BufferBuilder.java   |  8 ++
 .../runtime/io/network/buffer/BufferConsumer.java  | 14 
 .../api/writer/BroadcastRecordWriterTest.java  | 54 +
 .../api/writer/RecordWriterDelegateTest.java   | 12 ++-
 .../io/network/api/writer/RecordWriterTest.java| 73 -
 .../buffer/BufferBuilderAndConsumerTest.java   | 16 +---
 .../partition/NoOpBufferAvailablityListener.java   |  2 +-
 .../streaming/runtime/LatencyMarkerITCase.java | 93 ++
 10 files changed, 264 insertions(+), 19 deletions(-)
 create mode 100644 
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/LatencyMarkerITCase.java



[flink] branch master updated: [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing checkpoint aborted by coordinator RPC

2020-06-16 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 07772bd  [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker 
while executing checkpoint aborted by coordinator RPC
07772bd is described below

commit 07772bdb9abc0bcd3b3c8869f6abdc8088bd7cea
Author: Yun Tang 
AuthorDate: Wed Jun 17 11:10:39 2020 +0800

[FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing 
checkpoint aborted by coordinator RPC

In the case of aborting checkpoint RPC from CheckpointCoordinator, it will 
prevent executing the
respective checkpoint which was already triggered before. But we also need 
to broadcast the
CancelCheckpointMarker before exiting the execution , otherwise the 
downstream side would
probably wait for barrier alignment until deadlock.

This closes #12664.
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java|  4 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 75 ++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index b2fe147..0d6d638 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -235,9 +235,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
return;
}
 
-   // Step (0): Record the last triggered checkpointId.
+   // Step (0): Record the last triggered checkpointId and abort 
the sync phase of checkpoint if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+   // broadcast cancel checkpoint marker to avoid 
downstream back-pressure due to checkpoint barrier align.
+   operatorChain.broadcastEvent(new 
CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index f7655ea..00ef9a1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,7 +31,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
+import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -39,12 +46,15 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnaps

[flink] branch master updated (e2db1dc -> 07772bd)

2020-06-16 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e2db1dc  [FLINK-18303][filesystem][hive] Fix Filesystem connector 
doesn't flush part files after rolling interval
 add 07772bd  [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker 
while executing checkpoint aborted by coordinator RPC

No new revisions were added by this update.

Summary of changes:
 .../tasks/SubtaskCheckpointCoordinatorImpl.java|  4 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 75 ++
 2 files changed, 78 insertions(+), 1 deletion(-)



[flink] branch release-1.11 updated: [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing checkpoint aborted by coordinator RPC

2020-06-17 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new e7c634f   [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker 
while executing checkpoint aborted by coordinator RPC
e7c634f is described below

commit e7c634f223c0a2c180ca5abe14a4661115f0afe6
Author: Yun Tang 
AuthorDate: Wed Jun 17 21:10:02 2020 +0800

 [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing 
checkpoint aborted by coordinator RPC

In the case of aborting checkpoint RPC from CheckpointCoordinator, it will 
prevent executing the
respective checkpoint which was already triggered before. But we also need 
to broadcast the
CancelCheckpointMarker before exiting the execution , otherwise the 
downstream side would
probably wait for barrier alignment until deadlock.

This closes #12664.
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java|  4 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java| 75 ++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index b2fe147..0d6d638 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -235,9 +235,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
return;
}
 
-   // Step (0): Record the last triggered checkpointId.
+   // Step (0): Record the last triggered checkpointId and abort 
the sync phase of checkpoint if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+   // broadcast cancel checkpoint marker to avoid 
downstream back-pressure due to checkpoint barrier align.
+   operatorChain.broadcastEvent(new 
CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index f7655ea..00ef9a1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,7 +31,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
+import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -39,12 +46,15 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperato

[flink] annotated tag release-1.11.0-rc2 updated (c4132de -> 1934b4f)

2020-06-17 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to annotated tag release-1.11.0-rc2
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.11.0-rc2 was modified! ***

from c4132de  (commit)
  to 1934b4f  (tag)
 tagging c4132de4a50ab9b8f653c69af1ba15af44ff29a2 (commit)
 replaces pre-apache-rename
  by zhijiang
  on Thu Jun 18 10:35:21 2020 +0800

- Log -
release-1.11.0-rc2
-BEGIN PGP SIGNATURE-
Version: GnuPG v2.0.22 (GNU/Linux)

iQIcBAABAgAGBQJe6tLpAAoJEAZTwKLOoA0OQaMQALAisIDETJbt4/Vgku2psVky
Bg8N1N31XTZrlWu9ycCyHpv6wJ+nJZABsgGaste8csSPXTLcdWnd3YB4dl2cQfw8
REF8t8CZ1kFomNMHwD74B+xm8uKA+IaDEror+q9lIP6+S66ZX3tLLir/M0AbjQlB
kQ1WFcUh3mk8HYlLEiTpOc+fBcRkHJYaMaXpdjz45rGQdDO3T6E1ZkYoxOyVpf41
Cx0don+iwq7CAnwJvHVofgvHcf+HoZtjg/G0WPzCCeSkhU1wXWpRw6SsH4Aoy5Jx
UDIY1bnhXccb8jawGDT8T4fZpJ34SLKj6wzEx+RNf6iKEfrBQ+PwkTvJEzg5OZuu
zYBgEOidBwYVitl6j/gIkcOpHpucwhdu29UJ6qgXRqTyFkIsyeRpvJuHo+YmGdvZ
PAAVIL+HvxtsUsjOP4kfavOIH6ez/7W21tIk1TxMwKbTZrRl6kDfjmhbJRpMttha
0DohvaBhPH7oKvE/UlwEUQTBePhONt5JdwZ5IrNlN+cnDGWUX8y88wyKY8Hvbnip
Za9uOdMcqzAW9lOFEe+hm3YoieSTILgLvMaqmSQkrDB/JVR52xcTGY0FDqHIguzT
iZj/yr2jXRC9JRFyWOxYIj0eRuR44FN5nyNjC0BtC8q9DBvGkAQosBESpsDlKUWY
1Q3hgX0Zu1pi4vlcQNOJ
=Suyk
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



[flink] annotated tag release-1.11.0-rc2 updated (c4132de -> 1934b4f)

2020-06-17 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a change to annotated tag release-1.11.0-rc2
in repository https://gitbox.apache.org/repos/asf/flink.git.


*** WARNING: tag release-1.11.0-rc2 was modified! ***

from c4132de  (commit)
  to 1934b4f  (tag)
 tagging c4132de4a50ab9b8f653c69af1ba15af44ff29a2 (commit)
 replaces pre-apache-rename
  by zhijiang
  on Thu Jun 18 10:35:21 2020 +0800

- Log -
release-1.11.0-rc2
-BEGIN PGP SIGNATURE-
Version: GnuPG v2.0.22 (GNU/Linux)

iQIcBAABAgAGBQJe6tLpAAoJEAZTwKLOoA0OQaMQALAisIDETJbt4/Vgku2psVky
Bg8N1N31XTZrlWu9ycCyHpv6wJ+nJZABsgGaste8csSPXTLcdWnd3YB4dl2cQfw8
REF8t8CZ1kFomNMHwD74B+xm8uKA+IaDEror+q9lIP6+S66ZX3tLLir/M0AbjQlB
kQ1WFcUh3mk8HYlLEiTpOc+fBcRkHJYaMaXpdjz45rGQdDO3T6E1ZkYoxOyVpf41
Cx0don+iwq7CAnwJvHVofgvHcf+HoZtjg/G0WPzCCeSkhU1wXWpRw6SsH4Aoy5Jx
UDIY1bnhXccb8jawGDT8T4fZpJ34SLKj6wzEx+RNf6iKEfrBQ+PwkTvJEzg5OZuu
zYBgEOidBwYVitl6j/gIkcOpHpucwhdu29UJ6qgXRqTyFkIsyeRpvJuHo+YmGdvZ
PAAVIL+HvxtsUsjOP4kfavOIH6ez/7W21tIk1TxMwKbTZrRl6kDfjmhbJRpMttha
0DohvaBhPH7oKvE/UlwEUQTBePhONt5JdwZ5IrNlN+cnDGWUX8y88wyKY8Hvbnip
Za9uOdMcqzAW9lOFEe+hm3YoieSTILgLvMaqmSQkrDB/JVR52xcTGY0FDqHIguzT
iZj/yr2jXRC9JRFyWOxYIj0eRuR44FN5nyNjC0BtC8q9DBvGkAQosBESpsDlKUWY
1Q3hgX0Zu1pi4vlcQNOJ
=Suyk
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



  1   2   3   >