[flink] branch master updated: [FLINK-12312][runtime] Remove CLI command for rescaling

2019-04-29 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 566d10d  [FLINK-12312][runtime] Remove CLI command for rescaling
566d10d is described below

commit 566d10d958d71fd113f8f7ecc08fa9b63a072919
Author: Gary Yao 
AuthorDate: Wed Apr 24 21:50:22 2019 +0200

[FLINK-12312][runtime] Remove CLI command for rescaling

This closes #8260.
---
 docs/ops/cli.md|  23 --
 docs/ops/cli.zh.md |  23 --
 .../org/apache/flink/client/cli/CliFrontend.java   |  59 -
 .../apache/flink/client/cli/CliFrontendParser.java |  27 --
 .../apache/flink/client/program/ClusterClient.java |  11 -
 .../client/program/rest/RestClusterClient.java |  41 ---
 .../flink/client/cli/CliFrontendModifyTest.java| 136 --
 .../flink/runtime/dispatcher/Dispatcher.java   |  10 -
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 289 +
 .../flink/runtime/jobmaster/JobMasterGateway.java  |  36 +--
 .../runtime/jobmaster/RescalingBehaviour.java  |  49 
 .../handler/job/rescaling/RescalingHandlers.java   |  56 ++--
 .../flink/runtime/webmonitor/RestfulGateway.java   |  18 --
 .../jobmaster/utils/TestingJobMasterGateway.java   |  21 --
 .../utils/TestingJobMasterGatewayBuilder.java  |  15 +-
 15 files changed, 30 insertions(+), 784 deletions(-)

diff --git a/docs/ops/cli.md b/docs/ops/cli.md
index 42698e8..b414b30 100644
--- a/docs/ops/cli.md
+++ b/docs/ops/cli.md
@@ -37,7 +37,6 @@ The command line can be used to
 - provide information about a job,
 - list running and waiting jobs,
 - trigger and dispose savepoints, and
-- modify a running job
 
 A prerequisite to using the command line interface is that the Flink
 master (JobManager) has been started (via
@@ -126,10 +125,6 @@ available.
 
 ./bin/flink stop -s [targetDirectory] -d 
 
--   Modify a running job (streaming jobs only):
-
-./bin/flink modify  -p 
-
 
 **NOTE**: The difference between cancelling and stopping a (streaming) job is 
the following:
 
@@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job 
or disposes existing on
  in the configuration.
  -z,--zookeeperNamespaceNamespace to create the Zookeeper 
sub-paths
  for high availability mode
-
-
-
-Action "modify" modifies a running job (e.g. change of parallelism).
-
-  Syntax: modify  [OPTIONS]
-  "modify" action options:
- -h,--help   Show the help message for the CLI
- Frontend or the action.
- -p,--parallelismNew parallelism for the specified job.
- -v,--verboseThis option is deprecated.
-  Options for default mode:
- -m,--jobmanagerAddress of the JobManager (master) to 
which
- to connect. Use this flag to connect to a
- different JobManager than the one 
specified
- in the configuration.
- -z,--zookeeperNamespaceNamespace to create the Zookeeper 
sub-paths
- for high availability mode
 {% endhighlight %}
 
 {% top %}
diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md
index bf995b4..7c02047 100644
--- a/docs/ops/cli.zh.md
+++ b/docs/ops/cli.zh.md
@@ -37,7 +37,6 @@ The command line can be used to
 - provide information about a job,
 - list running and waiting jobs,
 - trigger and dispose savepoints, and
-- modify a running job
 
 A prerequisite to using the command line interface is that the Flink
 master (JobManager) has been started (via
@@ -126,10 +125,6 @@ available.
 
 ./bin/flink stop -s [targetDirectory] -d 
 
--   Modify a running job (streaming jobs only):
-
-./bin/flink modify  -p 
-
 
 **NOTE**: The difference between cancelling and stopping a (streaming) job is 
the following:
 
@@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job 
or disposes existing on
  in the configuration.
  -z,--zookeeperNamespaceNamespace to create the Zookeeper 
sub-paths
  for high availability mode
-
-
-
-Action "modify" modifies a running job (e.g. change of parallelism).
-
-  Syntax: modify  [OPTIONS]
-  "modify" action options:
- -h,--help   Show the help message for the CLI
- Frontend or the action.
- -p,--parallelismNew parallelism for the specified job.
- -v,--verboseThis option is deprecated.
-  Options for default mode:
- -m,--jobmanagerAddress of the JobManager (master) to 
which
-  

[flink] branch master updated (ecbc0a6 -> 8e58b8b)

2019-04-29 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ecbc0a6  [hotfix] Fix checkstyle violations in 
ExecutionVertexCancelTest
 new da1aa85  [FLINK-8640][build] Add japicmp-plugin Java EE dependencies
 new 7619cf3  [FLINK-8640][build] Rework hadoop jdk.tools exclusion
 new 8e58b8b  [FLINK-8640][travis] Enable japicmp on Java 9

The 16455 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .travis.yml | 14 +++---
 pom.xml | 41 +
 2 files changed, 48 insertions(+), 7 deletions(-)



[flink] branch master updated (bf7c1b7 -> ecbc0a6)

2019-04-29 Thread trohrmann
This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from bf7c1b7  [hotfix][runtime,tests] Add test coverage for 
BlockingCallMonitoringThreadPool
 new 7054a8b  [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new 
codebase
 new ecbc0a6  [hotfix] Fix checkstyle violations in 
ExecutionVertexCancelTest

The 16452 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../executiongraph/ExecutionVertexCancelTest.java  | 22 +-
 1 file changed, 9 insertions(+), 13 deletions(-)



[flink] branch master updated: [hotfix][runtime, tests] Add test coverage for BlockingCallMonitoringThreadPool

2019-04-29 Thread pnowojski
This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new bf7c1b7  [hotfix][runtime,tests] Add test coverage for 
BlockingCallMonitoringThreadPool
bf7c1b7 is described below

commit bf7c1b7dc7747cd4031dab10dde7a7729c1aaa56
Author: Piotr Nowojski 
AuthorDate: Fri Apr 26 11:43:43 2019 +0200

[hotfix][runtime,tests] Add test coverage for 
BlockingCallMonitoringThreadPool
---
 .../BlockingCallMonitoringThreadPool.java  |  31 --
 .../BlockingCallMonitoringThreadPoolTest.java  | 112 +
 2 files changed, 136 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
index 3c541c7..d0fb868 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.runtime.taskmanager;
 
+import org.apache.flink.annotation.VisibleForTesting;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
@@ -44,6 +47,10 @@ public class BlockingCallMonitoringThreadPool {
 
private final ThreadPoolExecutor executor;
 
+   public BlockingCallMonitoringThreadPool() {
+   this(Executors.defaultThreadFactory());
+   }
+
public BlockingCallMonitoringThreadPool(final ThreadFactory 
dispatcherThreadFactory) {
this.executor = new ThreadPoolExecutor(
1,
@@ -54,22 +61,22 @@ public class BlockingCallMonitoringThreadPool {
checkNotNull(dispatcherThreadFactory));
}
 
-   public void submit(final Runnable runnable, final boolean blocking) {
+   public CompletableFuture submit(final Runnable runnable, final 
boolean blocking) {
if (blocking) {
-   submitBlocking(runnable);
+   return submitBlocking(runnable);
} else {
-   submit(runnable);
+   return submit(runnable);
}
}
 
-   private void submit(final Runnable task) {
+   private CompletableFuture submit(final Runnable task) {
adjustThreadPoolSize(inFlightBlockingCallCounter.get());
-   executor.execute(task);
+   return CompletableFuture.runAsync(task, executor);
}
 
-   private void submitBlocking(final Runnable task) {
+   private CompletableFuture submitBlocking(final Runnable task) {

adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet());
-   CompletableFuture.runAsync(task, executor).whenComplete(
+   return CompletableFuture.runAsync(task, executor).whenComplete(
(ignored, e) -> 
inFlightBlockingCallCounter.decrementAndGet());
}
 
@@ -107,4 +114,14 @@ public class BlockingCallMonitoringThreadPool {
public void shutdownNow() {
executor.shutdownNow();
}
+
+   @VisibleForTesting
+   int getMaximumPoolSize() {
+   return executor.getMaximumPoolSize();
+   }
+
+   @VisibleForTesting
+   int getQueueSize() {
+   return executor.getQueue().size();
+   }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
new file mode 100644
index 000..2cc3454
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, 

[flink] branch release-1.8 updated: [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path

2019-04-29 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
 new 8020787  [FLINK-12350] [State Backends] RocksDBStateBackendTest 
doesn't cover the incremental checkpoint code path
8020787 is described below

commit 80207878251843a2263dcbf25eb21e0d6538048c
Author: Yu Li 
AuthorDate: Mon Apr 29 10:21:24 2019 +0200

[FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the 
incremental checkpoint code path

This closes #8297.

(cherry picked from commit 9aeb4e5cb38079cdfbcc25f8c3966b368287825d)
---
 .../flink/contrib/streaming/state/RocksDBStateBackendTest.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 819c864..ceb781c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -217,7 +217,9 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase stubState1 =
new ValueStateDescriptor<>("StubState-1", 
StringSerializer.INSTANCE);
test.createInternalState(StringSerializer.INSTANCE, 
stubState1);



[flink] branch master updated: [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path

2019-04-29 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 9aeb4e5  [FLINK-12350] [State Backends] RocksDBStateBackendTest 
doesn't cover the incremental checkpoint code path
9aeb4e5 is described below

commit 9aeb4e5cb38079cdfbcc25f8c3966b368287825d
Author: Yu Li 
AuthorDate: Mon Apr 29 16:21:24 2019 +0800

[FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the 
incremental checkpoint code path

This closes #8297.
---
 .../flink/contrib/streaming/state/RocksDBStateBackendTest.java| 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
index 819c864..ceb781c 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java
@@ -217,7 +217,9 @@ public class RocksDBStateBackendTest extends 
StateBackendTestBase stubState1 =
new ValueStateDescriptor<>("StubState-1", 
StringSerializer.INSTANCE);
test.createInternalState(StringSerializer.INSTANCE, 
stubState1);



[flink] branch master updated: [FLINK-11167] Optimize RocksDBListState#put by removing the clear before every put operation

2019-04-29 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 03faeb9  [FLINK-11167] Optimize RocksDBListState#put by removing the 
clear before every put operation
03faeb9 is described below

commit 03faeb9bfbc388666cf62dc2e972ee21c1370031
Author: Congxian Qiu 
AuthorDate: Mon Apr 29 16:15:49 2019 +0800

[FLINK-11167] Optimize RocksDBListState#put by removing the clear before 
every put operation

This closes #7421.

Differential Revision: https://aone.alibaba-inc.com/code/D816268
---
 .../org/apache/flink/contrib/streaming/state/RocksDBListState.java| 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
index c18adb1..03a68b2 100644
--- 
a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
+++ 
b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java
@@ -209,8 +209,6 @@ class RocksDBListState
public void updateInternal(List values) {
Preconditions.checkNotNull(values, "List of values to add 
cannot be null.");
 
-   clear();
-
if (!values.isEmpty()) {
try {
backend.db.put(
@@ -221,6 +219,8 @@ class RocksDBListState
} catch (IOException | RocksDBException e) {
throw new FlinkRuntimeException("Error while 
updating data to RocksDB", e);
}
+   } else {
+   clear();
}
}
 



[flink] branch master updated: [FLINK-10724] Refactor failure handling in check point coordinator

2019-04-29 Thread srichter
This is an automated email from the ASF dual-hosted git repository.

srichter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c4b0e8f  [FLINK-10724] Refactor failure handling in check point 
coordinator
c4b0e8f is described below

commit c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df
Author: vinoyang 
AuthorDate: Mon Apr 29 15:56:04 2019 +0800

[FLINK-10724] Refactor failure handling in check point coordinator

This closes #7571.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 89 +++--
 .../runtime/checkpoint/CheckpointException.java| 30 +--
 ...ineReason.java => CheckpointFailureReason.java} | 28 ++-
 .../checkpoint/CheckpointTriggerException.java | 42 --
 .../checkpoint/CheckpointTriggerResult.java| 92 --
 .../runtime/checkpoint/PendingCheckpoint.java  | 51 +++-
 .../executiongraph/failover/FailoverRegion.java|  5 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 12 +--
 .../checkpoint/CheckpointCoordinatorTest.java  | 38 +
 .../runtime/checkpoint/PendingCheckpointTest.java  | 30 +++
 .../jobmaster/JobMasterTriggerSavepointITCase.java |  4 +-
 .../test/streaming/runtime/TimestampITCase.java|  4 +-
 12 files changed, 160 insertions(+), 265 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index e6cc5d3..c7f59a7 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -338,7 +338,7 @@ public class CheckpointCoordinator {
 
// clear and discard all pending checkpoints
for (PendingCheckpoint pending : 
pendingCheckpoints.values()) {
-   pending.abortError(new 
Exception("Checkpoint Coordinator is shutting down"));
+   
pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN);
}
pendingCheckpoints.clear();
 
@@ -405,17 +405,17 @@ public class CheckpointCoordinator {
 
checkNotNull(checkpointProperties);
 
-   CheckpointTriggerResult triggerResult = triggerCheckpoint(
-   timestamp,
-   checkpointProperties,
-   targetLocation,
-   false,
-   advanceToEndOfEventTime);
-
-   if (triggerResult.isSuccess()) {
-   return 
triggerResult.getPendingCheckpoint().getCompletionFuture();
-   } else {
-   Throwable cause = new 
CheckpointTriggerException("Failed to trigger savepoint.", 
triggerResult.getFailureReason());
+   try {
+   PendingCheckpoint pendingCheckpoint = triggerCheckpoint(
+   timestamp,
+   checkpointProperties,
+   targetLocation,
+   false,
+   advanceToEndOfEventTime);
+
+   return pendingCheckpoint.getCompletionFuture();
+   } catch (CheckpointException e) {
+   Throwable cause = new CheckpointException("Failed to 
trigger savepoint.", e.getCheckpointFailureReason());
return FutureUtils.completedExceptionally(cause);
}
}
@@ -431,16 +431,21 @@ public class CheckpointCoordinator {
 * @return true if triggering the checkpoint succeeded.
 */
public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
-   return triggerCheckpoint(timestamp, checkpointProperties, null, 
isPeriodic, false).isSuccess();
+   try {
+   triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
+   return true;
+   } catch (CheckpointException e) {
+   return false;
+   }
}
 
@VisibleForTesting
-   public CheckpointTriggerResult triggerCheckpoint(
+   public PendingCheckpoint triggerCheckpoint(
long timestamp,
CheckpointProperties props,
@Nullable String externalSavepointLocation,
boolean isPeriodic,
-   boolean advanceToEndOfTime) {
+   boolean advanceToEndOfTime) throws CheckpointException {
 
i