flink git commit: [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.
Repository: flink Updated Branches: refs/heads/release-1.4 e76b10d07 -> a9b497749 [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs. This closes #5858. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9b49774 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9b49774 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9b49774 Branch: refs/heads/release-1.4 Commit: a9b497749710c708d076fc45688fff7b72416af1 Parents: e76b10d Author: juhoautio Authored: Mon Apr 16 17:40:23 2018 +0300 Committer: Fabian Hueske Committed: Mon Apr 16 21:44:12 2018 +0200 -- docs/dev/connectors/kafka.md | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a9b49774/docs/dev/connectors/kafka.md -- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 8e38146..75c5e23 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -444,6 +444,11 @@ the `Watermark getCurrentWatermark()` (for periodic) or the `Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine if a new watermark should be emitted and with which timestamp. +**Note**: If a watermark assigner depends on records read from Kafka to advance its watermarks (which is commonly the case), all topics and partitions need to have a continuous stream of records. Otherwise, the watermarks of the whole application cannot advance and all time-based operations, such as time windows or functions with timers, cannot make progress. A single idle Kafka partition causes this behavior. +A Flink improvement is planned to prevent this from happening +(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions]( +https://issues.apache.org/jira/browse/FLINK-5479)). +In the meanwhile, a possible workaround is to send *heartbeat messages* to all consumed partitions that advance the watermarks of idle partitions. ## Kafka Producer
[2/2] flink git commit: [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.
[FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs. This closes #5858. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc58f987 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc58f987 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc58f987 Branch: refs/heads/release-1.5 Commit: fc58f987144a307e7208de0bdfe439922bed4b55 Parents: 4c3d018 Author: juhoautio Authored: Mon Apr 16 17:40:23 2018 +0300 Committer: Fabian Hueske Committed: Mon Apr 16 21:42:21 2018 +0200 -- docs/dev/connectors/kafka.md | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fc58f987/docs/dev/connectors/kafka.md -- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 27fca7a..47a6651 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -451,6 +451,11 @@ the `Watermark getCurrentWatermark()` (for periodic) or the `Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine if a new watermark should be emitted and with which timestamp. +**Note**: If a watermark assigner depends on records read from Kafka to advance its watermarks (which is commonly the case), all topics and partitions need to have a continuous stream of records. Otherwise, the watermarks of the whole application cannot advance and all time-based operations, such as time windows or functions with timers, cannot make progress. A single idle Kafka partition causes this behavior. +A Flink improvement is planned to prevent this from happening +(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions]( +https://issues.apache.org/jira/browse/FLINK-5479)). +In the meanwhile, a possible workaround is to send *heartbeat messages* to all consumed partitions that advance the watermarks of idle partitions. ## Kafka Producer
[1/2] flink git commit: [FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3.
Repository: flink Updated Branches: refs/heads/release-1.5 a241d2af7 -> fc58f9871 [FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3. This closes #5826. This closes #1990. // closing old PR Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c3d018b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c3d018b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c3d018b Branch: refs/heads/release-1.5 Commit: 4c3d018bd75ace6361c02a1ad0b254350220542f Parents: a241d2a Author: yanghua Authored: Sun Apr 8 10:12:51 2018 +0800 Committer: Fabian Hueske Committed: Mon Apr 16 21:42:13 2018 +0200 -- flink-connectors/flink-orc/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4c3d018b/flink-connectors/flink-orc/pom.xml -- diff --git a/flink-connectors/flink-orc/pom.xml b/flink-connectors/flink-orc/pom.xml index 3ee5e49..2fe229c 100644 --- a/flink-connectors/flink-orc/pom.xml +++ b/flink-connectors/flink-orc/pom.xml @@ -57,7 +57,7 @@ under the License. org.apache.orc orc-core - 1.4.1 + 1.4.3
[1/2] flink git commit: [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.
Repository: flink Updated Branches: refs/heads/master 27be32e8a -> b005ea353 [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs. This closes #5858. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b005ea35 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b005ea35 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b005ea35 Branch: refs/heads/master Commit: b005ea35374f619298cebab649e0ba477aeaf860 Parents: afad30a Author: juhoautio Authored: Mon Apr 16 17:40:23 2018 +0300 Committer: Fabian Hueske Committed: Mon Apr 16 21:40:57 2018 +0200 -- docs/dev/connectors/kafka.md | 5 + 1 file changed, 5 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b005ea35/docs/dev/connectors/kafka.md -- diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md index 27fca7a..47a6651 100644 --- a/docs/dev/connectors/kafka.md +++ b/docs/dev/connectors/kafka.md @@ -451,6 +451,11 @@ the `Watermark getCurrentWatermark()` (for periodic) or the `Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` (for punctuated) is called to determine if a new watermark should be emitted and with which timestamp. +**Note**: If a watermark assigner depends on records read from Kafka to advance its watermarks (which is commonly the case), all topics and partitions need to have a continuous stream of records. Otherwise, the watermarks of the whole application cannot advance and all time-based operations, such as time windows or functions with timers, cannot make progress. A single idle Kafka partition causes this behavior. +A Flink improvement is planned to prevent this from happening +(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should consider idle partitions]( +https://issues.apache.org/jira/browse/FLINK-5479)). +In the meanwhile, a possible workaround is to send *heartbeat messages* to all consumed partitions that advance the watermarks of idle partitions. ## Kafka Producer
[2/2] flink git commit: [FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3.
[FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3. This closes #5826. This closes #1990. // closing old PR Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afad30a5 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afad30a5 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afad30a5 Branch: refs/heads/master Commit: afad30a54fa92b90bbaa97d2559713fdc218a53a Parents: 27be32e Author: yanghua Authored: Sun Apr 8 10:12:51 2018 +0800 Committer: Fabian Hueske Committed: Mon Apr 16 21:40:57 2018 +0200 -- flink-connectors/flink-orc/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/afad30a5/flink-connectors/flink-orc/pom.xml -- diff --git a/flink-connectors/flink-orc/pom.xml b/flink-connectors/flink-orc/pom.xml index 689cb08..86076bc 100644 --- a/flink-connectors/flink-orc/pom.xml +++ b/flink-connectors/flink-orc/pom.xml @@ -57,7 +57,7 @@ under the License. org.apache.orc orc-core - 1.4.1 + 1.4.3
[03/11] flink git commit: [FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration
[FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5423d0e2 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5423d0e2 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5423d0e2 Branch: refs/heads/release-1.5 Commit: 5423d0e2ebed8413b1200424689fab9cae5bfef5 Parents: a0a24b2 Author: zentol Authored: Thu Apr 5 11:00:45 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:32 2018 +0200 -- .../apache/flink/test/util/MiniClusterResource.java | 15 +++ 1 file changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5423d0e2/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java -- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 8a05750..531a3c7 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -25,8 +25,10 @@ import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.JobExecutorService; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -67,6 +69,8 @@ public class MiniClusterResource extends ExternalResource { private ClusterClient clusterClient; + private Configuration restClusterClientConfig; + private int numberSlots = -1; private TestEnvironment executionEnvironment; @@ -117,6 +121,10 @@ public class MiniClusterResource extends ExternalResource { return clusterClient; } + public Configuration getClientConfiguration() { + return restClusterClientConfig; + } + public TestEnvironment getTestEnvironment() { return executionEnvironment; } @@ -194,6 +202,9 @@ public class MiniClusterResource extends ExternalResource { if (enableClusterClient) { clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true); } + Configuration restClientConfig = new Configuration(); + restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort()); + this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } private void startMiniCluster() throws Exception { @@ -229,6 +240,10 @@ public class MiniClusterResource extends ExternalResource { if (enableClusterClient) { clusterClient = new MiniClusterClient(configuration, miniCluster); } + Configuration restClientConfig = new Configuration(); + restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost()); + restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); + this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } /**
[10/11] flink git commit: [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions
[hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f579f745 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f579f745 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f579f745 Branch: refs/heads/release-1.5 Commit: f579f745dc3868c1807a814695624d98b53f6d50 Parents: a562e5d Author: zentol Authored: Mon Mar 26 14:41:25 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:33 2018 +0200 -- .../flink/runtime/rest/messages/MessageQueryParameter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f579f745/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java index 180f011..6799df1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java @@ -35,7 +35,7 @@ public abstract class MessageQueryParameter extends MessageParameter> } @Override - public List convertFromString(String values) { + public List convertFromString(String values) throws ConversionException { String[] splitValues = values.split(","); List list = new ArrayList<>(); for (String value : splitValues) { @@ -50,7 +50,7 @@ public abstract class MessageQueryParameter extends MessageParameter> * @param value string representation of parameter value * @return parameter value */ - public abstract X convertStringToValue(String value); + public abstract X convertStringToValue(String value) throws ConversionException; @Override public String convertToString(List values) {
[04/11] flink git commit: [hotfix][tests] Add MCR constructor accepting configuration and type
[hotfix][tests] Add MCR constructor accepting configuration and type Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0a24b27 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0a24b27 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0a24b27 Branch: refs/heads/release-1.5 Commit: a0a24b277882382d3c0712ec8fea7c5166a86f9a Parents: 47909f4 Author: zentol Authored: Thu Apr 5 11:00:23 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:32 2018 +0200 -- .../java/org/apache/flink/test/util/MiniClusterResource.java | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a0a24b27/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java -- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 9b0ac77..8a05750 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -77,6 +77,12 @@ public class MiniClusterResource extends ExternalResource { public MiniClusterResource( final MiniClusterResourceConfiguration miniClusterResourceConfiguration, + final MiniClusterType miniClusterType) { + this(miniClusterResourceConfiguration, miniClusterType, false); + } + + public MiniClusterResource( + final MiniClusterResourceConfiguration miniClusterResourceConfiguration, final boolean enableClusterClient) { this( miniClusterResourceConfiguration,
[01/11] flink git commit: [hotfix][metrics] Make MessageParameter constructor protected
Repository: flink Updated Branches: refs/heads/release-1.5 c6d45b922 -> a241d2af7 [hotfix][metrics] Make MessageParameter constructor protected Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a562e5d4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a562e5d4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a562e5d4 Branch: refs/heads/release-1.5 Commit: a562e5d40bd6f7d02d4d39752ad140551c25c854 Parents: 2cc77f9 Author: zentol Authored: Mon Mar 26 14:41:03 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:32 2018 +0200 -- .../org/apache/flink/runtime/rest/messages/MessageParameter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a562e5d4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java index a615e96..b8485e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java @@ -42,7 +42,7 @@ public abstract class MessageParameter { private final String key; private X value; - MessageParameter(String key, MessageParameterRequisiteness requisiteness) { + protected MessageParameter(String key, MessageParameterRequisiteness requisiteness) { this.key = Preconditions.checkNotNull(key); this.requisiteness = Preconditions.checkNotNull(requisiteness); }
[07/11] flink git commit: [FLINK-9177][docs] Update Mesos getting started link
[FLINK-9177][docs] Update Mesos getting started link This closes #5850. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39e9e19c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39e9e19c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39e9e19c Branch: refs/heads/release-1.5 Commit: 39e9e19c5d663d5e69a845af8b00d7de20380101 Parents: 23d4543 Author: Arunan Sugunakumar Authored: Mon Apr 16 07:26:34 2018 + Committer: zentol Committed: Mon Apr 16 21:18:33 2018 +0200 -- docs/ops/deployment/mesos.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/39e9e19c/docs/ops/deployment/mesos.md -- diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md index 56f9d93..74bae39 100644 --- a/docs/ops/deployment/mesos.md +++ b/docs/ops/deployment/mesos.md @@ -101,7 +101,7 @@ You can also run Mesos without DC/OS. ### Installing Mesos -Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/documentation/latest/getting-started/). +Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/getting-started/). After installation you have to configure the set of master and agent nodes by creating the files `MESOS_HOME/etc/mesos/masters` and `MESOS_HOME/etc/mesos/slaves`. These files contain in each row a single hostname on which the respective component will be started (assuming SSH access to these nodes).
[09/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 This closes #5805. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23d45436 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23d45436 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23d45436 Branch: refs/heads/release-1.5 Commit: 23d454364d208d3ce8a55422edaaca365a1c9c79 Parents: f579f74 Author: zentol Authored: Wed Mar 28 12:52:07 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:33 2018 +0200 -- .../AbstractAggregatingMetricsHandler.java | 300 ++ .../metrics/AggregatingJobsMetricsHandler.java | 77 .../AggregatingSubtasksMetricsHandler.java | 119 ++ .../AggregatingTaskManagersMetricsHandler.java | 77 .../handler/job/metrics/DoubleAccumulator.java | 257 .../AbstractAggregatedMetricsHeaders.java | 50 +++ .../AbstractAggregatedMetricsParameters.java| 48 +++ .../AggregateTaskManagerMetricsParameters.java | 38 ++ .../metrics/AggregatedJobMetricsHeaders.java| 44 +++ .../metrics/AggregatedJobMetricsParameters.java | 39 ++ .../messages/job/metrics/AggregatedMetric.java | 118 ++ .../metrics/AggregatedMetricsResponseBody.java | 112 ++ .../AggregatedSubtaskMetricsHeaders.java| 47 +++ .../AggregatedSubtaskMetricsParameters.java | 51 +++ .../AggregatedTaskManagerMetricsHeaders.java| 44 +++ .../job/metrics/JobsFilterQueryParameter.java | 48 +++ .../metrics/MetricsAggregationParameter.java| 58 +++ .../metrics/SubtasksFilterQueryParameter.java | 41 ++ .../TaskManagersFilterQueryParameter.java | 42 ++ .../runtime/webmonitor/WebMonitorEndpoint.java | 33 ++ .../AggregatingJobsMetricsHandlerTest.java | 81 .../AggregatingMetricsHandlerTestBase.java | 389 +++ .../AggregatingSubtasksMetricsHandlerTest.java | 93 + ...gregatingTaskManagersMetricsHandlerTest.java | 82 24 files changed, 2288 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java new file mode 100644 index 000..338bb46 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.
[11/11] flink git commit: [FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address
[FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address -add back known logging mesages about webUI address -do not set random port in local stream environment This closes #5814. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a241d2af Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a241d2af Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a241d2af Branch: refs/heads/release-1.5 Commit: a241d2af7d640407974dfa460f4693d1f75a5ff2 Parents: 39e9e19 Author: zentol Authored: Wed Apr 4 10:44:59 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:33 2018 +0200 -- .../org/apache/flink/api/java/ExecutionEnvironment.java | 6 ++ .../flink/runtime/webmonitor/WebMonitorEndpoint.java | 10 +- .../streaming/api/environment/LocalStreamEnvironment.java | 4 +++- .../api/environment/StreamExecutionEnvironment.java | 6 ++ 4 files changed, 24 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1ce2221..3ea99ea 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; @@ -1125,6 +1126,11 @@ public abstract class ExecutionEnvironment { conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); + if (!conf.contains(RestOptions.REST_PORT)) { + // explicitly set this option so that it's not set to 0 later + conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue()); + } + return createLocalEnvironment(conf, -1); } http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index fb663ad..0ea7550 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -163,6 +163,8 @@ public class WebMonitorEndpoint extends RestServerEndp private final FatalErrorHandler fatalErrorHandler; + private boolean hasWebUI = false; + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever leaderRetriever, @@ -606,7 +608,10 @@ public class WebMonitorEndpoint extends RestServerEndp handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler)); optWebContent.ifPresent( - webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); + webContent -> { + handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)); + hasWebUI = true; + }); // load the log and stdout file handler for the main cluster component final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration); @@ -679,6 +684,9 @@ public class WebMonitorEndpoint extends RestServerEndp @Override public void startInternal() throws Exception { leaderElectionService.start(this); + if (hasWebUI) { + log.info("Web frontend listening at {}.", getRestBaseUrl()); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api
[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase to flip6
[FLINK-8961][tests] Port JobRetrievalITCase to flip6 This closes #5730. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cc77f9f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cc77f9f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cc77f9f Branch: refs/heads/release-1.5 Commit: 2cc77f9f6e999238ae9dd7d24712e5d7a397f4cb Parents: 5423d0e Author: zentol Authored: Tue Mar 20 15:19:47 2018 +0100 Committer: zentol Committed: Mon Apr 16 21:18:32 2018 +0200 -- .../test/example/client/JobRetrievalITCase.java | 121 +++--- .../client/LegacyJobRetrievalITCase.java| 162 +++ 2 files changed, 224 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2cc77f9f/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java index 57198c0..6b747e0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java @@ -21,30 +21,27 @@ package org.apache.flink.test.example.client; import org.apache.flink.api.common.JobID; import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.minicluster.FlinkMiniCluster; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.testutils.category.New; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.util.Optional; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicReference; - -import scala.collection.Seq; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -52,23 +49,41 @@ import static org.junit.Assert.fail; /** * Tests retrieval of a job from a running Flink cluster. */ +@Category(New.class) public class JobRetrievalITCase extends TestLogger { private static final Semaphore lock = new Semaphore(1); - private static FlinkMiniCluster cluster; - - @BeforeClass - public static void before() { - Configuration configuration = new Configuration(); - cluster = new TestingCluster(configuration, false); - cluster.start(); + @ClassRule + public static final MiniClusterResource CLUSTER = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + new Configuration(), + 1, + 4 + ), + MiniClusterResource.MiniClusterType.NEW + ); + + private RestClusterClient client; + + @Before + public void setUp() throws Exception { + final Configuration clientConfig = new Configuration(); + clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0); + clientConfig.setLong(RestOptions.RETRY_DELAY, 0); + clientConfig.addAll(CLUSTER.getClientConfiguration()); + + client = new RestClusterClient<>( + clientConfig, + StandaloneClusterId.getInstance() + ); } - @AfterClass - public static void after() { - cluster.stop(); - cluster = null; + @After + public void tearDown() { + if (client != null) { + client.sh
[06/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client
[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client This closes #5838. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46 Branch: refs/heads/release-1.5 Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817 Parents: 50504ce Author: zentol Authored: Wed Apr 11 12:48:51 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:32 2018 +0200 -- .../apache/flink/client/cli/CliFrontend.java| 3 ++ .../client/program/rest/RestClusterClient.java | 3 +- .../program/rest/RestClusterClientTest.java | 35 3 files changed, 40 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index ce6556b..65f470b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -1141,6 +1142,8 @@ public class CliFrontend { public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { config.setString(JobManagerOptions.ADDRESS, address.getHostString()); config.setInteger(JobManagerOptions.PORT, address.getPort()); + config.setString(RestOptions.REST_ADDRESS, address.getHostString()); + config.setInteger(RestOptions.REST_PORT, address.getPort()); } public static List> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index a6f676e..3d50e93 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -719,7 +719,8 @@ public class RestClusterClient extends ClusterClient implements NewCluster .orElse(false); } - private CompletableFuture getWebMonitorBaseUrl() { + @VisibleForTesting + CompletableFuture getWebMonitorBaseUrl() { return FutureUtils.orTimeout( webMonitorLeaderRetriever.getLeaderFuture(), restClusterClientConfiguration.getAwaitLeaderTimeout(), http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java -- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index e7f9bf9..e2daad6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; @@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandl
[05/11] flink git commit: [FLINK-9173][REST] Improve client error message for parsing failures
[FLINK-9173][REST] Improve client error message for parsing failures - print parsing exception for expected type, not error - add toString implemented to JsonResponse Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50504ced Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50504ced Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50504ced Branch: refs/heads/release-1.5 Commit: 50504ced6f162bd9247f8da49889ad2ea0183c0d Parents: c6d45b9 Author: zentol Authored: Mon Apr 16 10:31:52 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:18:32 2018 +0200 -- .../java/org/apache/flink/runtime/rest/RestClient.java | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/50504ced/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 6319634..df97f20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -217,7 +217,7 @@ public class RestClient { try { P response = objectMapper.readValue(jsonParser, responseType); responseFuture.complete(response); - } catch (IOException ioe) { + } catch (IOException originalException) { // the received response did not matched the expected response type // lets see if it is an ErrorResponse instead @@ -231,7 +231,7 @@ public class RestClient { responseFuture.completeExceptionally( new RestClientException( "Response was neither of the expected type(" + responseType + ") nor an error.", - jpe2, + originalException, rawResponse.getHttpResponseStatus())); } } @@ -328,5 +328,13 @@ public class RestClient { public HttpResponseStatus getHttpResponseStatus() { return httpResponseStatus; } + + @Override + public String toString() { + return "JsonResponse{" + + "json=" + json + + ", httpResponseStatus=" + httpResponseStatus + + '}'; + } } }
[08/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java new file mode 100644 index 000..4453ee2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Test base for handlers that extend {@link AbstractAggregatingMetricsHandler}. + */ +public abstract class AggregatingMetricsHandlerTestBase< + H extends AbstractAggregatingMetricsHandler, + P extends AbstractAggregatedMetricsParameters> + extends TestLogger { + + private static final CompletableFuture TEST_REST_ADDRESS; + private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY; + private static final GatewayRetriever LEADER_RETRIEVER; + private static final Time TIMEOUT = Time.milliseconds(50); + private static final Map TEST_HEADERS = Collections.emptyMap(); + private static final Executor EXECUTOR = TestingUtils.defaultExecutor(); + + static { + TEST_REST_ADDRESS = CompletableFuture.completedFuture("localhost:12345"); + + MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class); + + LEADER_RETRIEVER = new GatewayRetriever() { + @Override + public CompletableFuture getFuture() { + return CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY); + } + }; + } + + private H handler; + private MetricStore store; + private Map pathParameters; + + @Before + public void setUp() throws Exception { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + store = fetcher.getMetricStore(); + + Collection metricDumps = getMetricD
[03/11] flink git commit: [FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration
[FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1f3ca3f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1f3ca3f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1f3ca3f Branch: refs/heads/master Commit: b1f3ca3f61fa1f0a906804e42844c2c08bd3f5cc Parents: 8eb4604 Author: zentol Authored: Thu Apr 5 11:00:45 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../apache/flink/test/util/MiniClusterResource.java | 15 +++ 1 file changed, 15 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/b1f3ca3f/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java -- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 8a05750..531a3c7 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -25,8 +25,10 @@ import org.apache.flink.client.program.StandaloneClusterClient; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; +import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.RestOptions; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.minicluster.JobExecutorService; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; @@ -67,6 +69,8 @@ public class MiniClusterResource extends ExternalResource { private ClusterClient clusterClient; + private Configuration restClusterClientConfig; + private int numberSlots = -1; private TestEnvironment executionEnvironment; @@ -117,6 +121,10 @@ public class MiniClusterResource extends ExternalResource { return clusterClient; } + public Configuration getClientConfiguration() { + return restClusterClientConfig; + } + public TestEnvironment getTestEnvironment() { return executionEnvironment; } @@ -194,6 +202,9 @@ public class MiniClusterResource extends ExternalResource { if (enableClusterClient) { clusterClient = new StandaloneClusterClient(configuration, flinkMiniCluster.highAvailabilityServices(), true); } + Configuration restClientConfig = new Configuration(); + restClientConfig.setInteger(JobManagerOptions.PORT, flinkMiniCluster.getLeaderRPCPort()); + this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } private void startMiniCluster() throws Exception { @@ -229,6 +240,10 @@ public class MiniClusterResource extends ExternalResource { if (enableClusterClient) { clusterClient = new MiniClusterClient(configuration, miniCluster); } + Configuration restClientConfig = new Configuration(); + restClientConfig.setString(JobManagerOptions.ADDRESS, miniCluster.getRestAddress().getHost()); + restClientConfig.setInteger(RestOptions.REST_PORT, miniCluster.getRestAddress().getPort()); + this.restClusterClientConfig = new UnmodifiableConfiguration(restClientConfig); } /**
[10/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6 This closes #5805. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0410d80 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0410d80 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0410d80 Branch: refs/heads/master Commit: c0410d801e406e77b1e6e7134224f7946906a49f Parents: 4645d3c Author: zentol Authored: Wed Mar 28 12:52:07 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:54 2018 +0200 -- .../AbstractAggregatingMetricsHandler.java | 300 ++ .../metrics/AggregatingJobsMetricsHandler.java | 77 .../AggregatingSubtasksMetricsHandler.java | 119 ++ .../AggregatingTaskManagersMetricsHandler.java | 77 .../handler/job/metrics/DoubleAccumulator.java | 257 .../AbstractAggregatedMetricsHeaders.java | 50 +++ .../AbstractAggregatedMetricsParameters.java| 48 +++ .../AggregateTaskManagerMetricsParameters.java | 38 ++ .../metrics/AggregatedJobMetricsHeaders.java| 44 +++ .../metrics/AggregatedJobMetricsParameters.java | 39 ++ .../messages/job/metrics/AggregatedMetric.java | 118 ++ .../metrics/AggregatedMetricsResponseBody.java | 112 ++ .../AggregatedSubtaskMetricsHeaders.java| 47 +++ .../AggregatedSubtaskMetricsParameters.java | 51 +++ .../AggregatedTaskManagerMetricsHeaders.java| 44 +++ .../job/metrics/JobsFilterQueryParameter.java | 48 +++ .../metrics/MetricsAggregationParameter.java| 58 +++ .../metrics/SubtasksFilterQueryParameter.java | 41 ++ .../TaskManagersFilterQueryParameter.java | 42 ++ .../runtime/webmonitor/WebMonitorEndpoint.java | 33 ++ .../AggregatingJobsMetricsHandlerTest.java | 81 .../AggregatingMetricsHandlerTestBase.java | 389 +++ .../AggregatingSubtasksMetricsHandlerTest.java | 93 + ...gregatingTaskManagersMetricsHandlerTest.java | 82 24 files changed, 2288 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java new file mode 100644 index 000..338bb46 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.rest.handler.AbstractRestHandler; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.RestHandlerException; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter; +import org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.util.Preconditions; + +import org.apache.flink.shaded.netty4.io.netty
[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase to flip6
[FLINK-8961][tests] Port JobRetrievalITCase to flip6 This closes #5730. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2266eb01 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2266eb01 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2266eb01 Branch: refs/heads/master Commit: 2266eb010b377450aa1f01ec589fe8758e9a0c6d Parents: b1f3ca3 Author: zentol Authored: Tue Mar 20 15:19:47 2018 +0100 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../test/example/client/JobRetrievalITCase.java | 121 +++--- .../client/LegacyJobRetrievalITCase.java| 162 +++ 2 files changed, 224 insertions(+), 59 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2266eb01/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java -- diff --git a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java index 57198c0..6b747e0 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java @@ -21,30 +21,27 @@ package org.apache.flink.test.example.client; import org.apache.flink.api.common.JobID; import org.apache.flink.client.deployment.StandaloneClusterId; -import org.apache.flink.client.program.ClusterClient; -import org.apache.flink.client.program.StandaloneClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.client.JobRetrievalException; +import org.apache.flink.configuration.RestOptions; +import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.minicluster.FlinkMiniCluster; -import org.apache.flink.runtime.testingUtils.TestingCluster; -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages; +import org.apache.flink.test.util.MiniClusterResource; +import org.apache.flink.testutils.category.New; +import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.TestLogger; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.testkit.JavaTestKit; -import org.junit.AfterClass; -import org.junit.BeforeClass; +import org.junit.After; +import org.junit.Before; +import org.junit.ClassRule; import org.junit.Test; +import org.junit.experimental.categories.Category; +import java.util.Optional; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicReference; - -import scala.collection.Seq; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.fail; @@ -52,23 +49,41 @@ import static org.junit.Assert.fail; /** * Tests retrieval of a job from a running Flink cluster. */ +@Category(New.class) public class JobRetrievalITCase extends TestLogger { private static final Semaphore lock = new Semaphore(1); - private static FlinkMiniCluster cluster; - - @BeforeClass - public static void before() { - Configuration configuration = new Configuration(); - cluster = new TestingCluster(configuration, false); - cluster.start(); + @ClassRule + public static final MiniClusterResource CLUSTER = new MiniClusterResource( + new MiniClusterResource.MiniClusterResourceConfiguration( + new Configuration(), + 1, + 4 + ), + MiniClusterResource.MiniClusterType.NEW + ); + + private RestClusterClient client; + + @Before + public void setUp() throws Exception { + final Configuration clientConfig = new Configuration(); + clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0); + clientConfig.setLong(RestOptions.RETRY_DELAY, 0); + clientConfig.addAll(CLUSTER.getClientConfiguration()); + + client = new RestClusterClient<>( + clientConfig, + StandaloneClusterId.getInstance() + ); } - @AfterClass - public static void after() { - cluster.stop(); - cluster = null; + @After + public void tearDown() { + if (client != null) { + client.shutdow
[06/11] flink git commit: [FLINK-9173][REST] Improve client error message for parsing failures
[FLINK-9173][REST] Improve client error message for parsing failures - print parsing exception for expected type, not error - add toString implemented to JsonResponse Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e95fa5a4 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e95fa5a4 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e95fa5a4 Branch: refs/heads/master Commit: e95fa5a4a03fce74a76ac29395c8a5f69276 Parents: 185b904 Author: zentol Authored: Mon Apr 16 10:31:52 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../java/org/apache/flink/runtime/rest/RestClient.java | 12 ++-- 1 file changed, 10 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/e95fa5a4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java index 6319634..df97f20 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java @@ -217,7 +217,7 @@ public class RestClient { try { P response = objectMapper.readValue(jsonParser, responseType); responseFuture.complete(response); - } catch (IOException ioe) { + } catch (IOException originalException) { // the received response did not matched the expected response type // lets see if it is an ErrorResponse instead @@ -231,7 +231,7 @@ public class RestClient { responseFuture.completeExceptionally( new RestClientException( "Response was neither of the expected type(" + responseType + ") nor an error.", - jpe2, + originalException, rawResponse.getHttpResponseStatus())); } } @@ -328,5 +328,13 @@ public class RestClient { public HttpResponseStatus getHttpResponseStatus() { return httpResponseStatus; } + + @Override + public String toString() { + return "JsonResponse{" + + "json=" + json + + ", httpResponseStatus=" + httpResponseStatus + + '}'; + } } }
[09/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java new file mode 100644 index 000..4453ee2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java @@ -0,0 +1,389 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.job.metrics; + +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.metrics.dump.MetricDump; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher; +import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore; +import org.apache.flink.runtime.rest.messages.EmptyRequestBody; +import org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric; +import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; +import org.apache.flink.util.TestLogger; + +import org.junit.Before; +import org.junit.Test; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.stream.Collectors; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.powermock.api.mockito.PowerMockito.mock; + +/** + * Test base for handlers that extend {@link AbstractAggregatingMetricsHandler}. + */ +public abstract class AggregatingMetricsHandlerTestBase< + H extends AbstractAggregatingMetricsHandler, + P extends AbstractAggregatedMetricsParameters> + extends TestLogger { + + private static final CompletableFuture TEST_REST_ADDRESS; + private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY; + private static final GatewayRetriever LEADER_RETRIEVER; + private static final Time TIMEOUT = Time.milliseconds(50); + private static final Map TEST_HEADERS = Collections.emptyMap(); + private static final Executor EXECUTOR = TestingUtils.defaultExecutor(); + + static { + TEST_REST_ADDRESS = CompletableFuture.completedFuture("localhost:12345"); + + MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class); + + LEADER_RETRIEVER = new GatewayRetriever() { + @Override + public CompletableFuture getFuture() { + return CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY); + } + }; + } + + private H handler; + private MetricStore store; + private Map pathParameters; + + @Before + public void setUp() throws Exception { + MetricFetcher fetcher = new MetricFetcher( + mock(GatewayRetriever.class), + mock(MetricQueryServiceRetriever.class), + Executors.directExecutor(), + TestingUtils.TIMEOUT()); + store = fetcher.getMetricStore(); + + Collection metricDumps = getMetricD
[04/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client
[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client This closes #5838. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f0fa0b3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f0fa0b3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f0fa0b3 Branch: refs/heads/master Commit: 4f0fa0b3f992da4474e2703a54a8445cf1e29856 Parents: e95fa5a Author: zentol Authored: Wed Apr 11 12:48:51 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../apache/flink/client/cli/CliFrontend.java| 3 ++ .../client/program/rest/RestClusterClient.java | 3 +- .../program/rest/RestClusterClientTest.java | 35 3 files changed, 40 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4f0fa0b3/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index ce6556b..65f470b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.Optimizer; @@ -1141,6 +1142,8 @@ public class CliFrontend { public static void setJobManagerAddressInConfig(Configuration config, InetSocketAddress address) { config.setString(JobManagerOptions.ADDRESS, address.getHostString()); config.setInteger(JobManagerOptions.PORT, address.getPort()); + config.setString(RestOptions.REST_ADDRESS, address.getHostString()); + config.setInteger(RestOptions.REST_PORT, address.getPort()); } public static List> loadCustomCommandLines(Configuration configuration, String configurationDirectory) { http://git-wip-us.apache.org/repos/asf/flink/blob/4f0fa0b3/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java -- diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index a6f676e..3d50e93 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -719,7 +719,8 @@ public class RestClusterClient extends ClusterClient implements NewCluster .orElse(false); } - private CompletableFuture getWebMonitorBaseUrl() { + @VisibleForTesting + CompletableFuture getWebMonitorBaseUrl() { return FutureUtils.orTimeout( webMonitorLeaderRetriever.getLeaderFuture(), restClusterClientConfiguration.getAwaitLeaderTimeout(), http://git-wip-us.apache.org/repos/asf/flink/blob/4f0fa0b3/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java -- diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java index e7f9bf9..e2daad6 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java @@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.client.cli.DefaultCLI; +import org.apache.flink.client.deployment.StandaloneClusterDescriptor; import org.apache.flink.client.deployment.StandaloneClusterId; import org.apache.flink.client.program.ProgramInvocationException; import org.apache.flink.configuration.Configuration; @@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
[11/11] flink git commit: [FLINK-9177][docs] Update Mesos getting started link
[FLINK-9177][docs] Update Mesos getting started link This closes #5850. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f73c8dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f73c8dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f73c8dc Branch: refs/heads/master Commit: 4f73c8dcbcb8c03339882cd3259549ea5b6e38cf Parents: c0410d8 Author: Arunan Sugunakumar Authored: Mon Apr 16 07:26:34 2018 + Committer: zentol Committed: Mon Apr 16 21:17:54 2018 +0200 -- docs/ops/deployment/mesos.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4f73c8dc/docs/ops/deployment/mesos.md -- diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md index 56f9d93..74bae39 100644 --- a/docs/ops/deployment/mesos.md +++ b/docs/ops/deployment/mesos.md @@ -101,7 +101,7 @@ You can also run Mesos without DC/OS. ### Installing Mesos -Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/documentation/latest/getting-started/). +Please follow the [instructions on how to setup Mesos on the official website](http://mesos.apache.org/getting-started/). After installation you have to configure the set of master and agent nodes by creating the files `MESOS_HOME/etc/mesos/masters` and `MESOS_HOME/etc/mesos/slaves`. These files contain in each row a single hostname on which the respective component will be started (assuming SSH access to these nodes).
[01/11] flink git commit: [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions
Repository: flink Updated Branches: refs/heads/master 185b904aa -> 27be32e8a [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4645d3c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4645d3c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4645d3c0 Branch: refs/heads/master Commit: 4645d3c06736c7adf360a5380cd2a05b4eb752c9 Parents: 1cd7c42 Author: zentol Authored: Mon Mar 26 14:41:25 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../flink/runtime/rest/messages/MessageQueryParameter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4645d3c0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java index 180f011..6799df1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java @@ -35,7 +35,7 @@ public abstract class MessageQueryParameter extends MessageParameter> } @Override - public List convertFromString(String values) { + public List convertFromString(String values) throws ConversionException { String[] splitValues = values.split(","); List list = new ArrayList<>(); for (String value : splitValues) { @@ -50,7 +50,7 @@ public abstract class MessageQueryParameter extends MessageParameter> * @param value string representation of parameter value * @return parameter value */ - public abstract X convertStringToValue(String value); + public abstract X convertStringToValue(String value) throws ConversionException; @Override public String convertToString(List values) {
[07/11] flink git commit: [hotfix][tests] Add MCR constructor accepting configuration and type
[hotfix][tests] Add MCR constructor accepting configuration and type Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb4604c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb4604c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb4604c Branch: refs/heads/master Commit: 8eb4604cd82a89258a6581ca8be4f419784e5096 Parents: 4f0fa0b Author: zentol Authored: Thu Apr 5 11:00:23 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../java/org/apache/flink/test/util/MiniClusterResource.java | 6 ++ 1 file changed, 6 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8eb4604c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java -- diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java index 9b0ac77..8a05750 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java @@ -77,6 +77,12 @@ public class MiniClusterResource extends ExternalResource { public MiniClusterResource( final MiniClusterResourceConfiguration miniClusterResourceConfiguration, + final MiniClusterType miniClusterType) { + this(miniClusterResourceConfiguration, miniClusterType, false); + } + + public MiniClusterResource( + final MiniClusterResourceConfiguration miniClusterResourceConfiguration, final boolean enableClusterClient) { this( miniClusterResourceConfiguration,
[08/11] flink git commit: [FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address
[FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address -add back known logging mesages about webUI address -do not set random port in local stream environment This closes #5814. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27be32e8 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27be32e8 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27be32e8 Branch: refs/heads/master Commit: 27be32e8a44e3afcce9a17e3b95767869f56ab61 Parents: 4f73c8d Author: zentol Authored: Wed Apr 4 10:44:59 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:54 2018 +0200 -- .../org/apache/flink/api/java/ExecutionEnvironment.java | 6 ++ .../flink/runtime/webmonitor/WebMonitorEndpoint.java | 10 +- .../streaming/api/environment/LocalStreamEnvironment.java | 4 +++- .../api/environment/StreamExecutionEnvironment.java | 6 ++ 4 files changed, 24 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/27be32e8/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java -- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 1ce2221..3ea99ea 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo; import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.RestOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.types.StringValue; import org.apache.flink.util.NumberSequenceIterator; @@ -1125,6 +1126,11 @@ public abstract class ExecutionEnvironment { conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true); + if (!conf.contains(RestOptions.REST_PORT)) { + // explicitly set this option so that it's not set to 0 later + conf.setInteger(RestOptions.REST_PORT, RestOptions.REST_PORT.defaultValue()); + } + return createLocalEnvironment(conf, -1); } http://git-wip-us.apache.org/repos/asf/flink/blob/27be32e8/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java index fb663ad..0ea7550 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java @@ -163,6 +163,8 @@ public class WebMonitorEndpoint extends RestServerEndp private final FatalErrorHandler fatalErrorHandler; + private boolean hasWebUI = false; + public WebMonitorEndpoint( RestServerEndpointConfiguration endpointConfiguration, GatewayRetriever leaderRetriever, @@ -606,7 +608,10 @@ public class WebMonitorEndpoint extends RestServerEndp handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), jobStopTerminationHandler)); optWebContent.ifPresent( - webContent -> handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent))); + webContent -> { + handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), webContent)); + hasWebUI = true; + }); // load the log and stdout file handler for the main cluster component final WebMonitorUtils.LogFileLocation logFileLocation = WebMonitorUtils.LogFileLocation.find(clusterConfiguration); @@ -679,6 +684,9 @@ public class WebMonitorEndpoint extends RestServerEndp @Override public void startInternal() throws Exception { leaderElectionService.start(this); + if (hasWebUI) { + log.info("Web frontend listening at {}.", getRestBaseUrl()); + } } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/27be32e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/envi
[05/11] flink git commit: [hotfix][metrics] Make MessageParameter constructor protected
[hotfix][metrics] Make MessageParameter constructor protected Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cd7c423 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cd7c423 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cd7c423 Branch: refs/heads/master Commit: 1cd7c423d87cf680fb84af83ebfcc3246d079d1f Parents: 2266eb0 Author: zentol Authored: Mon Mar 26 14:41:03 2018 +0200 Committer: zentol Committed: Mon Apr 16 21:17:53 2018 +0200 -- .../org/apache/flink/runtime/rest/messages/MessageParameter.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/1cd7c423/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java index a615e96..b8485e5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java @@ -42,7 +42,7 @@ public abstract class MessageParameter { private final String key; private X value; - MessageParameter(String key, MessageParameterRequisiteness requisiteness) { + protected MessageParameter(String key, MessageParameterRequisiteness requisiteness) { this.key = Preconditions.checkNotNull(key); this.requisiteness = Preconditions.checkNotNull(requisiteness); }
flink git commit: [FLINK-9145] [table] Clean up flink-table dependencies
Repository: flink Updated Branches: refs/heads/release-1.5 f97c0c15e -> c6d45b922 [FLINK-9145] [table] Clean up flink-table dependencies This closes #5853. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6d45b92 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6d45b92 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6d45b92 Branch: refs/heads/release-1.5 Commit: c6d45b9225987537493472f20e933a81b63c9cde Parents: f97c0c1 Author: Timo Walther Authored: Mon Apr 16 13:02:22 2018 +0200 Committer: Timo Walther Committed: Mon Apr 16 18:11:58 2018 +0200 -- flink-libraries/flink-sql-client/pom.xml | 17 +++ flink-libraries/flink-table/pom.xml | 67 ++- 2 files changed, 83 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c6d45b92/flink-libraries/flink-sql-client/pom.xml -- diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml index dcc2743..7349ba4 100644 --- a/flink-libraries/flink-sql-client/pom.xml +++ b/flink-libraries/flink-sql-client/pom.xml @@ -192,6 +192,23 @@ under the License. + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + dependency-convergence + + enforce + + + false + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/c6d45b92/flink-libraries/flink-table/pom.xml -- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index 9ccc70b..73be397 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -32,6 +32,41 @@ under the License. jar + + + + + com.google.guava + guava + 19.0 + + + + commons-beanutils + commons-beanutils + 1.8.3 + + + + org.codehaus.janino + commons-compiler + 3.0.7 + + + + commons-lang + commons-lang + 2.6 + + + + org.codehaus.janino + janino + 3.0.7 + + + + @@ -49,27 +84,38 @@ under the License. provided + commons-configuration commons-configuration + commons-codec commons-codec + + + org.apache.commons + commons-lang3 + + + org.codehaus.janino janino - 3.0.7 + org.apache.calcite calcite-core + 1.16.0 + org.apache.calcite.avatica avatica-metrics @@ -105,6 +151,7 @@ under the License. + org.reflections
flink git commit: [FLINK-9145] [table] Clean up flink-table dependencies
Repository: flink Updated Branches: refs/heads/master 71c3cd278 -> 185b904aa [FLINK-9145] [table] Clean up flink-table dependencies This closes #5853. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/185b904a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/185b904a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/185b904a Branch: refs/heads/master Commit: 185b904aa82dd99a178de0688fa67afba0edf679 Parents: 71c3cd2 Author: Timo Walther Authored: Mon Apr 16 13:02:22 2018 +0200 Committer: Timo Walther Committed: Mon Apr 16 18:09:41 2018 +0200 -- flink-libraries/flink-sql-client/pom.xml | 17 +++ flink-libraries/flink-table/pom.xml | 67 ++- 2 files changed, 83 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/185b904a/flink-libraries/flink-sql-client/pom.xml -- diff --git a/flink-libraries/flink-sql-client/pom.xml b/flink-libraries/flink-sql-client/pom.xml index 6bcfc13..3763bf0 100644 --- a/flink-libraries/flink-sql-client/pom.xml +++ b/flink-libraries/flink-sql-client/pom.xml @@ -192,6 +192,23 @@ under the License. + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + dependency-convergence + + enforce + + + false + + + + http://git-wip-us.apache.org/repos/asf/flink/blob/185b904a/flink-libraries/flink-table/pom.xml -- diff --git a/flink-libraries/flink-table/pom.xml b/flink-libraries/flink-table/pom.xml index d6c146a..acffb2b 100644 --- a/flink-libraries/flink-table/pom.xml +++ b/flink-libraries/flink-table/pom.xml @@ -32,6 +32,41 @@ under the License. jar + + + + + com.google.guava + guava + 19.0 + + + + commons-beanutils + commons-beanutils + 1.8.3 + + + + org.codehaus.janino + commons-compiler + 3.0.7 + + + + commons-lang + commons-lang + 2.6 + + + + org.codehaus.janino + janino + 3.0.7 + + + + @@ -49,27 +84,38 @@ under the License. provided + commons-configuration commons-configuration + commons-codec commons-codec + + + org.apache.commons + commons-lang3 + + + org.codehaus.janino janino - 3.0.7 + org.apache.calcite calcite-core + 1.16.0 + org.apache.calcite.avatica avatica-metrics @@ -105,6 +151,7 @@ under the License. + org.reflections