[flink] branch master updated: [FLINK-12312][runtime] Remove CLI command for rescaling
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 566d10d [FLINK-12312][runtime] Remove CLI command for rescaling 566d10d is described below commit 566d10d958d71fd113f8f7ecc08fa9b63a072919 Author: Gary Yao AuthorDate: Wed Apr 24 21:50:22 2019 +0200 [FLINK-12312][runtime] Remove CLI command for rescaling This closes #8260. --- docs/ops/cli.md| 23 -- docs/ops/cli.zh.md | 23 -- .../org/apache/flink/client/cli/CliFrontend.java | 59 - .../apache/flink/client/cli/CliFrontendParser.java | 27 -- .../apache/flink/client/program/ClusterClient.java | 11 - .../client/program/rest/RestClusterClient.java | 41 --- .../flink/client/cli/CliFrontendModifyTest.java| 136 -- .../flink/runtime/dispatcher/Dispatcher.java | 10 - .../apache/flink/runtime/jobmaster/JobMaster.java | 289 + .../flink/runtime/jobmaster/JobMasterGateway.java | 36 +-- .../runtime/jobmaster/RescalingBehaviour.java | 49 .../handler/job/rescaling/RescalingHandlers.java | 56 ++-- .../flink/runtime/webmonitor/RestfulGateway.java | 18 -- .../jobmaster/utils/TestingJobMasterGateway.java | 21 -- .../utils/TestingJobMasterGatewayBuilder.java | 15 +- 15 files changed, 30 insertions(+), 784 deletions(-) diff --git a/docs/ops/cli.md b/docs/ops/cli.md index 42698e8..b414b30 100644 --- a/docs/ops/cli.md +++ b/docs/ops/cli.md @@ -37,7 +37,6 @@ The command line can be used to - provide information about a job, - list running and waiting jobs, - trigger and dispose savepoints, and -- modify a running job A prerequisite to using the command line interface is that the Flink master (JobManager) has been started (via @@ -126,10 +125,6 @@ available. ./bin/flink stop -s [targetDirectory] -d -- Modify a running job (streaming jobs only): - -./bin/flink modify -p - **NOTE**: The difference between cancelling and stopping a (streaming) job is the following: @@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on in the configuration. -z,--zookeeperNamespaceNamespace to create the Zookeeper sub-paths for high availability mode - - - -Action "modify" modifies a running job (e.g. change of parallelism). - - Syntax: modify [OPTIONS] - "modify" action options: - -h,--help Show the help message for the CLI - Frontend or the action. - -p,--parallelismNew parallelism for the specified job. - -v,--verboseThis option is deprecated. - Options for default mode: - -m,--jobmanagerAddress of the JobManager (master) to which - to connect. Use this flag to connect to a - different JobManager than the one specified - in the configuration. - -z,--zookeeperNamespaceNamespace to create the Zookeeper sub-paths - for high availability mode {% endhighlight %} {% top %} diff --git a/docs/ops/cli.zh.md b/docs/ops/cli.zh.md index bf995b4..7c02047 100644 --- a/docs/ops/cli.zh.md +++ b/docs/ops/cli.zh.md @@ -37,7 +37,6 @@ The command line can be used to - provide information about a job, - list running and waiting jobs, - trigger and dispose savepoints, and -- modify a running job A prerequisite to using the command line interface is that the Flink master (JobManager) has been started (via @@ -126,10 +125,6 @@ available. ./bin/flink stop -s [targetDirectory] -d -- Modify a running job (streaming jobs only): - -./bin/flink modify -p - **NOTE**: The difference between cancelling and stopping a (streaming) job is the following: @@ -429,24 +424,6 @@ Action "savepoint" triggers savepoints for a running job or disposes existing on in the configuration. -z,--zookeeperNamespaceNamespace to create the Zookeeper sub-paths for high availability mode - - - -Action "modify" modifies a running job (e.g. change of parallelism). - - Syntax: modify [OPTIONS] - "modify" action options: - -h,--help Show the help message for the CLI - Frontend or the action. - -p,--parallelismNew parallelism for the specified job. - -v,--verboseThis option is deprecated. - Options for default mode: - -m,--jobmanagerAddress of the JobManager (master) to which -
[flink] branch master updated (ecbc0a6 -> 8e58b8b)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from ecbc0a6 [hotfix] Fix checkstyle violations in ExecutionVertexCancelTest new da1aa85 [FLINK-8640][build] Add japicmp-plugin Java EE dependencies new 7619cf3 [FLINK-8640][build] Rework hadoop jdk.tools exclusion new 8e58b8b [FLINK-8640][travis] Enable japicmp on Java 9 The 16455 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .travis.yml | 14 +++--- pom.xml | 41 + 2 files changed, 48 insertions(+), 7 deletions(-)
[flink] branch master updated (bf7c1b7 -> ecbc0a6)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from bf7c1b7 [hotfix][runtime,tests] Add test coverage for BlockingCallMonitoringThreadPool new 7054a8b [FLINK-12180][Tests] Port ExecutionVertexCancelTest to new codebase new ecbc0a6 [hotfix] Fix checkstyle violations in ExecutionVertexCancelTest The 16452 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../executiongraph/ExecutionVertexCancelTest.java | 22 +- 1 file changed, 9 insertions(+), 13 deletions(-)
[flink] branch master updated: [hotfix][runtime, tests] Add test coverage for BlockingCallMonitoringThreadPool
This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new bf7c1b7 [hotfix][runtime,tests] Add test coverage for BlockingCallMonitoringThreadPool bf7c1b7 is described below commit bf7c1b7dc7747cd4031dab10dde7a7729c1aaa56 Author: Piotr Nowojski AuthorDate: Fri Apr 26 11:43:43 2019 +0200 [hotfix][runtime,tests] Add test coverage for BlockingCallMonitoringThreadPool --- .../BlockingCallMonitoringThreadPool.java | 31 -- .../BlockingCallMonitoringThreadPoolTest.java | 112 + 2 files changed, 136 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java index 3c541c7..d0fb868 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPool.java @@ -18,10 +18,13 @@ package org.apache.flink.runtime.taskmanager; +import org.apache.flink.annotation.VisibleForTesting; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; @@ -44,6 +47,10 @@ public class BlockingCallMonitoringThreadPool { private final ThreadPoolExecutor executor; + public BlockingCallMonitoringThreadPool() { + this(Executors.defaultThreadFactory()); + } + public BlockingCallMonitoringThreadPool(final ThreadFactory dispatcherThreadFactory) { this.executor = new ThreadPoolExecutor( 1, @@ -54,22 +61,22 @@ public class BlockingCallMonitoringThreadPool { checkNotNull(dispatcherThreadFactory)); } - public void submit(final Runnable runnable, final boolean blocking) { + public CompletableFuture submit(final Runnable runnable, final boolean blocking) { if (blocking) { - submitBlocking(runnable); + return submitBlocking(runnable); } else { - submit(runnable); + return submit(runnable); } } - private void submit(final Runnable task) { + private CompletableFuture submit(final Runnable task) { adjustThreadPoolSize(inFlightBlockingCallCounter.get()); - executor.execute(task); + return CompletableFuture.runAsync(task, executor); } - private void submitBlocking(final Runnable task) { + private CompletableFuture submitBlocking(final Runnable task) { adjustThreadPoolSize(inFlightBlockingCallCounter.incrementAndGet()); - CompletableFuture.runAsync(task, executor).whenComplete( + return CompletableFuture.runAsync(task, executor).whenComplete( (ignored, e) -> inFlightBlockingCallCounter.decrementAndGet()); } @@ -107,4 +114,14 @@ public class BlockingCallMonitoringThreadPool { public void shutdownNow() { executor.shutdownNow(); } + + @VisibleForTesting + int getMaximumPoolSize() { + return executor.getMaximumPoolSize(); + } + + @VisibleForTesting + int getQueueSize() { + return executor.getQueue().size(); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java new file mode 100644 index 000..2cc3454 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/BlockingCallMonitoringThreadPoolTest.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND,
[flink] branch release-1.8 updated: [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.8 by this push: new 8020787 [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path 8020787 is described below commit 80207878251843a2263dcbf25eb21e0d6538048c Author: Yu Li AuthorDate: Mon Apr 29 10:21:24 2019 +0200 [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path This closes #8297. (cherry picked from commit 9aeb4e5cb38079cdfbcc25f8c3966b368287825d) --- .../flink/contrib/streaming/state/RocksDBStateBackendTest.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 819c864..ceb781c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -217,7 +217,9 @@ public class RocksDBStateBackendTest extends StateBackendTestBase stubState1 = new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE); test.createInternalState(StringSerializer.INSTANCE, stubState1);
[flink] branch master updated: [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 9aeb4e5 [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path 9aeb4e5 is described below commit 9aeb4e5cb38079cdfbcc25f8c3966b368287825d Author: Yu Li AuthorDate: Mon Apr 29 16:21:24 2019 +0800 [FLINK-12350] [State Backends] RocksDBStateBackendTest doesn't cover the incremental checkpoint code path This closes #8297. --- .../flink/contrib/streaming/state/RocksDBStateBackendTest.java| 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java index 819c864..ceb781c 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendTest.java @@ -217,7 +217,9 @@ public class RocksDBStateBackendTest extends StateBackendTestBase stubState1 = new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE); test.createInternalState(StringSerializer.INSTANCE, stubState1);
[flink] branch master updated: [FLINK-11167] Optimize RocksDBListState#put by removing the clear before every put operation
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 03faeb9 [FLINK-11167] Optimize RocksDBListState#put by removing the clear before every put operation 03faeb9 is described below commit 03faeb9bfbc388666cf62dc2e972ee21c1370031 Author: Congxian Qiu AuthorDate: Mon Apr 29 16:15:49 2019 +0800 [FLINK-11167] Optimize RocksDBListState#put by removing the clear before every put operation This closes #7421. Differential Revision: https://aone.alibaba-inc.com/code/D816268 --- .../org/apache/flink/contrib/streaming/state/RocksDBListState.java| 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java index c18adb1..03a68b2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBListState.java @@ -209,8 +209,6 @@ class RocksDBListState public void updateInternal(List values) { Preconditions.checkNotNull(values, "List of values to add cannot be null."); - clear(); - if (!values.isEmpty()) { try { backend.db.put( @@ -221,6 +219,8 @@ class RocksDBListState } catch (IOException | RocksDBException e) { throw new FlinkRuntimeException("Error while updating data to RocksDB", e); } + } else { + clear(); } }
[flink] branch master updated: [FLINK-10724] Refactor failure handling in check point coordinator
This is an automated email from the ASF dual-hosted git repository. srichter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c4b0e8f [FLINK-10724] Refactor failure handling in check point coordinator c4b0e8f is described below commit c4b0e8f68c5c4bb2ba60b358df92ee5db1d857df Author: vinoyang AuthorDate: Mon Apr 29 15:56:04 2019 +0800 [FLINK-10724] Refactor failure handling in check point coordinator This closes #7571. --- .../runtime/checkpoint/CheckpointCoordinator.java | 89 +++-- .../runtime/checkpoint/CheckpointException.java| 30 +-- ...ineReason.java => CheckpointFailureReason.java} | 28 ++- .../checkpoint/CheckpointTriggerException.java | 42 -- .../checkpoint/CheckpointTriggerResult.java| 92 -- .../runtime/checkpoint/PendingCheckpoint.java | 51 +++- .../executiongraph/failover/FailoverRegion.java| 5 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 12 +-- .../checkpoint/CheckpointCoordinatorTest.java | 38 + .../runtime/checkpoint/PendingCheckpointTest.java | 30 +++ .../jobmaster/JobMasterTriggerSavepointITCase.java | 4 +- .../test/streaming/runtime/TimestampITCase.java| 4 +- 12 files changed, 160 insertions(+), 265 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index e6cc5d3..c7f59a7 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -338,7 +338,7 @@ public class CheckpointCoordinator { // clear and discard all pending checkpoints for (PendingCheckpoint pending : pendingCheckpoints.values()) { - pending.abortError(new Exception("Checkpoint Coordinator is shutting down")); + pending.abort(CheckpointFailureReason.CHECKPOINT_COORDINATOR_SHUTDOWN); } pendingCheckpoints.clear(); @@ -405,17 +405,17 @@ public class CheckpointCoordinator { checkNotNull(checkpointProperties); - CheckpointTriggerResult triggerResult = triggerCheckpoint( - timestamp, - checkpointProperties, - targetLocation, - false, - advanceToEndOfEventTime); - - if (triggerResult.isSuccess()) { - return triggerResult.getPendingCheckpoint().getCompletionFuture(); - } else { - Throwable cause = new CheckpointTriggerException("Failed to trigger savepoint.", triggerResult.getFailureReason()); + try { + PendingCheckpoint pendingCheckpoint = triggerCheckpoint( + timestamp, + checkpointProperties, + targetLocation, + false, + advanceToEndOfEventTime); + + return pendingCheckpoint.getCompletionFuture(); + } catch (CheckpointException e) { + Throwable cause = new CheckpointException("Failed to trigger savepoint.", e.getCheckpointFailureReason()); return FutureUtils.completedExceptionally(cause); } } @@ -431,16 +431,21 @@ public class CheckpointCoordinator { * @return true if triggering the checkpoint succeeded. */ public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) { - return triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false).isSuccess(); + try { + triggerCheckpoint(timestamp, checkpointProperties, null, isPeriodic, false); + return true; + } catch (CheckpointException e) { + return false; + } } @VisibleForTesting - public CheckpointTriggerResult triggerCheckpoint( + public PendingCheckpoint triggerCheckpoint( long timestamp, CheckpointProperties props, @Nullable String externalSavepointLocation, boolean isPeriodic, - boolean advanceToEndOfTime) { + boolean advanceToEndOfTime) throws CheckpointException { i