flink git commit: [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.

2018-04-16 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.4 e76b10d07 -> a9b497749


[FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.

This closes #5858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a9b49774
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a9b49774
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a9b49774

Branch: refs/heads/release-1.4
Commit: a9b497749710c708d076fc45688fff7b72416af1
Parents: e76b10d
Author: juhoautio 
Authored: Mon Apr 16 17:40:23 2018 +0300
Committer: Fabian Hueske 
Committed: Mon Apr 16 21:44:12 2018 +0200

--
 docs/dev/connectors/kafka.md | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a9b49774/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 8e38146..75c5e23 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -444,6 +444,11 @@ the `Watermark getCurrentWatermark()` (for periodic) or the
 `Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` 
(for punctuated) is called to determine
 if a new watermark should be emitted and with which timestamp.
 
+**Note**: If a watermark assigner depends on records read from Kafka to 
advance its watermarks (which is commonly the case), all topics and partitions 
need to have a continuous stream of records. Otherwise, the watermarks of the 
whole application cannot advance and all time-based operations, such as time 
windows or functions with timers, cannot make progress. A single idle Kafka 
partition causes this behavior. 
+A Flink improvement is planned to prevent this from happening 
+(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should 
consider idle partitions](
+https://issues.apache.org/jira/browse/FLINK-5479)).
+In the meanwhile, a possible workaround is to send *heartbeat messages* to all 
consumed partitions that advance the watermarks of idle partitions.
 
 ## Kafka Producer
 



[2/2] flink git commit: [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.

2018-04-16 Thread fhueske
[FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.

This closes #5858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fc58f987
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fc58f987
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fc58f987

Branch: refs/heads/release-1.5
Commit: fc58f987144a307e7208de0bdfe439922bed4b55
Parents: 4c3d018
Author: juhoautio 
Authored: Mon Apr 16 17:40:23 2018 +0300
Committer: Fabian Hueske 
Committed: Mon Apr 16 21:42:21 2018 +0200

--
 docs/dev/connectors/kafka.md | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fc58f987/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 27fca7a..47a6651 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -451,6 +451,11 @@ the `Watermark getCurrentWatermark()` (for periodic) or the
 `Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` 
(for punctuated) is called to determine
 if a new watermark should be emitted and with which timestamp.
 
+**Note**: If a watermark assigner depends on records read from Kafka to 
advance its watermarks (which is commonly the case), all topics and partitions 
need to have a continuous stream of records. Otherwise, the watermarks of the 
whole application cannot advance and all time-based operations, such as time 
windows or functions with timers, cannot make progress. A single idle Kafka 
partition causes this behavior. 
+A Flink improvement is planned to prevent this from happening 
+(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should 
consider idle partitions](
+https://issues.apache.org/jira/browse/FLINK-5479)).
+In the meanwhile, a possible workaround is to send *heartbeat messages* to all 
consumed partitions that advance the watermarks of idle partitions.
 
 ## Kafka Producer
 



[1/2] flink git commit: [FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3.

2018-04-16 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.5 a241d2af7 -> fc58f9871


[FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3.

This closes #5826.
This closes #1990. // closing old PR


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4c3d018b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4c3d018b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4c3d018b

Branch: refs/heads/release-1.5
Commit: 4c3d018bd75ace6361c02a1ad0b254350220542f
Parents: a241d2a
Author: yanghua 
Authored: Sun Apr 8 10:12:51 2018 +0800
Committer: Fabian Hueske 
Committed: Mon Apr 16 21:42:13 2018 +0200

--
 flink-connectors/flink-orc/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4c3d018b/flink-connectors/flink-orc/pom.xml
--
diff --git a/flink-connectors/flink-orc/pom.xml 
b/flink-connectors/flink-orc/pom.xml
index 3ee5e49..2fe229c 100644
--- a/flink-connectors/flink-orc/pom.xml
+++ b/flink-connectors/flink-orc/pom.xml
@@ -57,7 +57,7 @@ under the License.

org.apache.orc
orc-core
-   1.4.1
+   1.4.3






[1/2] flink git commit: [FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.

2018-04-16 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 27be32e8a -> b005ea353


[FLINK-9183] [docs] Add warning about idle partitions to Kafka connector docs.

This closes #5858.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b005ea35
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b005ea35
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b005ea35

Branch: refs/heads/master
Commit: b005ea35374f619298cebab649e0ba477aeaf860
Parents: afad30a
Author: juhoautio 
Authored: Mon Apr 16 17:40:23 2018 +0300
Committer: Fabian Hueske 
Committed: Mon Apr 16 21:40:57 2018 +0200

--
 docs/dev/connectors/kafka.md | 5 +
 1 file changed, 5 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b005ea35/docs/dev/connectors/kafka.md
--
diff --git a/docs/dev/connectors/kafka.md b/docs/dev/connectors/kafka.md
index 27fca7a..47a6651 100644
--- a/docs/dev/connectors/kafka.md
+++ b/docs/dev/connectors/kafka.md
@@ -451,6 +451,11 @@ the `Watermark getCurrentWatermark()` (for periodic) or the
 `Watermark checkAndGetNextWatermark(T lastElement, long extractedTimestamp)` 
(for punctuated) is called to determine
 if a new watermark should be emitted and with which timestamp.
 
+**Note**: If a watermark assigner depends on records read from Kafka to 
advance its watermarks (which is commonly the case), all topics and partitions 
need to have a continuous stream of records. Otherwise, the watermarks of the 
whole application cannot advance and all time-based operations, such as time 
windows or functions with timers, cannot make progress. A single idle Kafka 
partition causes this behavior. 
+A Flink improvement is planned to prevent this from happening 
+(see [FLINK-5479: Per-partition watermarks in FlinkKafkaConsumer should 
consider idle partitions](
+https://issues.apache.org/jira/browse/FLINK-5479)).
+In the meanwhile, a possible workaround is to send *heartbeat messages* to all 
consumed partitions that advance the watermarks of idle partitions.
 
 ## Kafka Producer
 



[2/2] flink git commit: [FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3.

2018-04-16 Thread fhueske
[FLINK-9089] [orc] Upgrade Orc dependency from 1.4.1 to 1.4.3.

This closes #5826.
This closes #1990. // closing old PR


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/afad30a5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/afad30a5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/afad30a5

Branch: refs/heads/master
Commit: afad30a54fa92b90bbaa97d2559713fdc218a53a
Parents: 27be32e
Author: yanghua 
Authored: Sun Apr 8 10:12:51 2018 +0800
Committer: Fabian Hueske 
Committed: Mon Apr 16 21:40:57 2018 +0200

--
 flink-connectors/flink-orc/pom.xml | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/afad30a5/flink-connectors/flink-orc/pom.xml
--
diff --git a/flink-connectors/flink-orc/pom.xml 
b/flink-connectors/flink-orc/pom.xml
index 689cb08..86076bc 100644
--- a/flink-connectors/flink-orc/pom.xml
+++ b/flink-connectors/flink-orc/pom.xml
@@ -57,7 +57,7 @@ under the License.

org.apache.orc
orc-core
-   1.4.1
+   1.4.3






[03/11] flink git commit: [FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration

2018-04-16 Thread chesnay
[FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5423d0e2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5423d0e2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5423d0e2

Branch: refs/heads/release-1.5
Commit: 5423d0e2ebed8413b1200424689fab9cae5bfef5
Parents: a0a24b2
Author: zentol 
Authored: Thu Apr 5 11:00:45 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:32 2018 +0200

--
 .../apache/flink/test/util/MiniClusterResource.java  | 15 +++
 1 file changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5423d0e2/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
--
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8a05750..531a3c7 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -25,8 +25,10 @@ import 
org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -67,6 +69,8 @@ public class MiniClusterResource extends ExternalResource {
 
private ClusterClient clusterClient;
 
+   private Configuration restClusterClientConfig;
+
private int numberSlots = -1;
 
private TestEnvironment executionEnvironment;
@@ -117,6 +121,10 @@ public class MiniClusterResource extends ExternalResource {
return clusterClient;
}
 
+   public Configuration getClientConfiguration() {
+   return restClusterClientConfig;
+   }
+
public TestEnvironment getTestEnvironment() {
return executionEnvironment;
}
@@ -194,6 +202,9 @@ public class MiniClusterResource extends ExternalResource {
if (enableClusterClient) {
clusterClient = new 
StandaloneClusterClient(configuration, 
flinkMiniCluster.highAvailabilityServices(), true);
}
+   Configuration restClientConfig = new Configuration();
+   restClientConfig.setInteger(JobManagerOptions.PORT, 
flinkMiniCluster.getLeaderRPCPort());
+   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
}
 
private void startMiniCluster() throws Exception {
@@ -229,6 +240,10 @@ public class MiniClusterResource extends ExternalResource {
if (enableClusterClient) {
clusterClient = new MiniClusterClient(configuration, 
miniCluster);
}
+   Configuration restClientConfig = new Configuration();
+   restClientConfig.setString(JobManagerOptions.ADDRESS, 
miniCluster.getRestAddress().getHost());
+   restClientConfig.setInteger(RestOptions.REST_PORT, 
miniCluster.getRestAddress().getPort());
+   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
}
 
/**



[10/11] flink git commit: [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions

2018-04-16 Thread chesnay
[hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f579f745
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f579f745
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f579f745

Branch: refs/heads/release-1.5
Commit: f579f745dc3868c1807a814695624d98b53f6d50
Parents: a562e5d
Author: zentol 
Authored: Mon Mar 26 14:41:25 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:33 2018 +0200

--
 .../flink/runtime/rest/messages/MessageQueryParameter.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f579f745/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
index 180f011..6799df1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
@@ -35,7 +35,7 @@ public abstract class MessageQueryParameter extends 
MessageParameter>
}
 
@Override
-   public List convertFromString(String values) {
+   public List convertFromString(String values) throws 
ConversionException {
String[] splitValues = values.split(",");
List list = new ArrayList<>();
for (String value : splitValues) {
@@ -50,7 +50,7 @@ public abstract class MessageQueryParameter extends 
MessageParameter>
 * @param value string representation of parameter value
 * @return parameter value
 */
-   public abstract X convertStringToValue(String value);
+   public abstract X convertStringToValue(String value) throws 
ConversionException;
 
@Override
public String convertToString(List values) {



[04/11] flink git commit: [hotfix][tests] Add MCR constructor accepting configuration and type

2018-04-16 Thread chesnay
[hotfix][tests] Add MCR constructor accepting configuration and type


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a0a24b27
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a0a24b27
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a0a24b27

Branch: refs/heads/release-1.5
Commit: a0a24b277882382d3c0712ec8fea7c5166a86f9a
Parents: 47909f4
Author: zentol 
Authored: Thu Apr 5 11:00:23 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:32 2018 +0200

--
 .../java/org/apache/flink/test/util/MiniClusterResource.java   | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a0a24b27/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
--
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 9b0ac77..8a05750 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -77,6 +77,12 @@ public class MiniClusterResource extends ExternalResource {
 
public MiniClusterResource(
final MiniClusterResourceConfiguration 
miniClusterResourceConfiguration,
+   final MiniClusterType miniClusterType) {
+   this(miniClusterResourceConfiguration, miniClusterType, false);
+   }
+
+   public MiniClusterResource(
+   final MiniClusterResourceConfiguration 
miniClusterResourceConfiguration,
final boolean enableClusterClient) {
this(
miniClusterResourceConfiguration,



[01/11] flink git commit: [hotfix][metrics] Make MessageParameter constructor protected

2018-04-16 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/release-1.5 c6d45b922 -> a241d2af7


[hotfix][metrics] Make MessageParameter constructor protected


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a562e5d4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a562e5d4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a562e5d4

Branch: refs/heads/release-1.5
Commit: a562e5d40bd6f7d02d4d39752ad140551c25c854
Parents: 2cc77f9
Author: zentol 
Authored: Mon Mar 26 14:41:03 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:32 2018 +0200

--
 .../org/apache/flink/runtime/rest/messages/MessageParameter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a562e5d4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index a615e96..b8485e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -42,7 +42,7 @@ public abstract class MessageParameter {
private final String key;
private X value;
 
-   MessageParameter(String key, MessageParameterRequisiteness 
requisiteness) {
+   protected MessageParameter(String key, MessageParameterRequisiteness 
requisiteness) {
this.key = Preconditions.checkNotNull(key);
this.requisiteness = Preconditions.checkNotNull(requisiteness);
}



[07/11] flink git commit: [FLINK-9177][docs] Update Mesos getting started link

2018-04-16 Thread chesnay
[FLINK-9177][docs] Update Mesos getting started link

This closes #5850.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/39e9e19c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/39e9e19c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/39e9e19c

Branch: refs/heads/release-1.5
Commit: 39e9e19c5d663d5e69a845af8b00d7de20380101
Parents: 23d4543
Author: Arunan Sugunakumar 
Authored: Mon Apr 16 07:26:34 2018 +
Committer: zentol 
Committed: Mon Apr 16 21:18:33 2018 +0200

--
 docs/ops/deployment/mesos.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/39e9e19c/docs/ops/deployment/mesos.md
--
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 56f9d93..74bae39 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -101,7 +101,7 @@ You can also run Mesos without DC/OS.
 
 ### Installing Mesos
 
-Please follow the [instructions on how to setup Mesos on the official 
website](http://mesos.apache.org/documentation/latest/getting-started/).
+Please follow the [instructions on how to setup Mesos on the official 
website](http://mesos.apache.org/getting-started/).
 
 After installation you have to configure the set of master and agent nodes by 
creating the files `MESOS_HOME/etc/mesos/masters` and 
`MESOS_HOME/etc/mesos/slaves`.
 These files contain in each row a single hostname on which the respective 
component will be started (assuming SSH access to these nodes).



[09/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

2018-04-16 Thread chesnay
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

This closes #5805.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/23d45436
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/23d45436
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/23d45436

Branch: refs/heads/release-1.5
Commit: 23d454364d208d3ce8a55422edaaca365a1c9c79
Parents: f579f74
Author: zentol 
Authored: Wed Mar 28 12:52:07 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:33 2018 +0200

--
 .../AbstractAggregatingMetricsHandler.java  | 300 ++
 .../metrics/AggregatingJobsMetricsHandler.java  |  77 
 .../AggregatingSubtasksMetricsHandler.java  | 119 ++
 .../AggregatingTaskManagersMetricsHandler.java  |  77 
 .../handler/job/metrics/DoubleAccumulator.java  | 257 
 .../AbstractAggregatedMetricsHeaders.java   |  50 +++
 .../AbstractAggregatedMetricsParameters.java|  48 +++
 .../AggregateTaskManagerMetricsParameters.java  |  38 ++
 .../metrics/AggregatedJobMetricsHeaders.java|  44 +++
 .../metrics/AggregatedJobMetricsParameters.java |  39 ++
 .../messages/job/metrics/AggregatedMetric.java  | 118 ++
 .../metrics/AggregatedMetricsResponseBody.java  | 112 ++
 .../AggregatedSubtaskMetricsHeaders.java|  47 +++
 .../AggregatedSubtaskMetricsParameters.java |  51 +++
 .../AggregatedTaskManagerMetricsHeaders.java|  44 +++
 .../job/metrics/JobsFilterQueryParameter.java   |  48 +++
 .../metrics/MetricsAggregationParameter.java|  58 +++
 .../metrics/SubtasksFilterQueryParameter.java   |  41 ++
 .../TaskManagersFilterQueryParameter.java   |  42 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  33 ++
 .../AggregatingJobsMetricsHandlerTest.java  |  81 
 .../AggregatingMetricsHandlerTestBase.java  | 389 +++
 .../AggregatingSubtasksMetricsHandlerTest.java  |  93 +
 ...gregatingTaskManagersMetricsHandlerTest.java |  82 
 24 files changed, 2288 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
new file mode 100644
index 000..338bb46
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.

[11/11] flink git commit: [FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address

2018-04-16 Thread chesnay
[FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly 
logging message for web UI address

-add back known logging mesages about webUI address
-do not set random port in local stream environment

This closes #5814.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a241d2af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a241d2af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a241d2af

Branch: refs/heads/release-1.5
Commit: a241d2af7d640407974dfa460f4693d1f75a5ff2
Parents: 39e9e19
Author: zentol 
Authored: Wed Apr 4 10:44:59 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:33 2018 +0200

--
 .../org/apache/flink/api/java/ExecutionEnvironment.java   |  6 ++
 .../flink/runtime/webmonitor/WebMonitorEndpoint.java  | 10 +-
 .../streaming/api/environment/LocalStreamEnvironment.java |  4 +++-
 .../api/environment/StreamExecutionEnvironment.java   |  6 ++
 4 files changed, 24 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1ce2221..3ea99ea 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -1125,6 +1126,11 @@ public abstract class ExecutionEnvironment {
 
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
+   if (!conf.contains(RestOptions.REST_PORT)) {
+   // explicitly set this option so that it's not set to 0 
later
+   conf.setInteger(RestOptions.REST_PORT, 
RestOptions.REST_PORT.defaultValue());
+   }
+
return createLocalEnvironment(conf, -1);
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index fb663ad..0ea7550 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -163,6 +163,8 @@ public class WebMonitorEndpoint 
extends RestServerEndp
 
private final FatalErrorHandler fatalErrorHandler;
 
+   private boolean hasWebUI = false;
+
public WebMonitorEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever leaderRetriever,
@@ -606,7 +608,10 @@ public class WebMonitorEndpoint 
extends RestServerEndp

handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), 
jobStopTerminationHandler));
 
optWebContent.ifPresent(
-   webContent -> 
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent)));
+   webContent -> {
+   
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent));
+   hasWebUI = true;
+   });
 
// load the log and stdout file handler for the main cluster 
component
final WebMonitorUtils.LogFileLocation logFileLocation = 
WebMonitorUtils.LogFileLocation.find(clusterConfiguration);
@@ -679,6 +684,9 @@ public class WebMonitorEndpoint 
extends RestServerEndp
@Override
public void startInternal() throws Exception {
leaderElectionService.start(this);
+   if (hasWebUI) {
+   log.info("Web frontend listening at {}.", 
getRestBaseUrl());
+   }
}
 
@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a241d2af/flink-streaming-java/src/main/java/org/apache/flink/streaming/api

[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase to flip6

2018-04-16 Thread chesnay
[FLINK-8961][tests] Port JobRetrievalITCase to flip6

This closes #5730.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2cc77f9f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2cc77f9f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2cc77f9f

Branch: refs/heads/release-1.5
Commit: 2cc77f9f6e999238ae9dd7d24712e5d7a397f4cb
Parents: 5423d0e
Author: zentol 
Authored: Tue Mar 20 15:19:47 2018 +0100
Committer: zentol 
Committed: Mon Apr 16 21:18:32 2018 +0200

--
 .../test/example/client/JobRetrievalITCase.java | 121 +++---
 .../client/LegacyJobRetrievalITCase.java| 162 +++
 2 files changed, 224 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2cc77f9f/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 57198c0..6b747e0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -21,30 +21,27 @@ package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -52,23 +49,41 @@ import static org.junit.Assert.fail;
 /**
  * Tests retrieval of a job from a running Flink cluster.
  */
+@Category(New.class)
 public class JobRetrievalITCase extends TestLogger {
 
private static final Semaphore lock = new Semaphore(1);
 
-   private static FlinkMiniCluster cluster;
-
-   @BeforeClass
-   public static void before() {
-   Configuration configuration = new Configuration();
-   cluster = new TestingCluster(configuration, false);
-   cluster.start();
+   @ClassRule
+   public static final MiniClusterResource CLUSTER = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   new Configuration(),
+   1,
+   4
+   ),
+   MiniClusterResource.MiniClusterType.NEW
+   );
+
+   private RestClusterClient client;
+
+   @Before
+   public void setUp() throws Exception {
+   final Configuration clientConfig = new Configuration();
+   clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+   clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
+   clientConfig.addAll(CLUSTER.getClientConfiguration());
+
+   client = new RestClusterClient<>(
+   clientConfig,
+   StandaloneClusterId.getInstance()
+   );
}
 
-   @AfterClass
-   public static void after() {
-   cluster.stop();
-   cluster = null;
+   @After
+   public void tearDown() {
+   if (client != null) {
+   client.sh

[06/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

2018-04-16 Thread chesnay
[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

This closes #5838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/47909f46
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/47909f46
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/47909f46

Branch: refs/heads/release-1.5
Commit: 47909f466b9c9ee1f4caf94e9f6862a21b628817
Parents: 50504ce
Author: zentol 
Authored: Wed Apr 11 12:48:51 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:32 2018 +0200

--
 .../apache/flink/client/cli/CliFrontend.java|  3 ++
 .../client/program/rest/RestClusterClient.java  |  3 +-
 .../program/rest/RestClusterClientTest.java | 35 
 3 files changed, 40 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index ce6556b..65f470b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -1141,6 +1142,8 @@ public class CliFrontend {
public static void setJobManagerAddressInConfig(Configuration config, 
InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, 
address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
+   config.setString(RestOptions.REST_ADDRESS, 
address.getHostString());
+   config.setInteger(RestOptions.REST_PORT, address.getPort());
}
 
public static List> 
loadCustomCommandLines(Configuration configuration, String 
configurationDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6f676e..3d50e93 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -719,7 +719,8 @@ public class RestClusterClient extends ClusterClient 
implements NewCluster
.orElse(false);
}
 
-   private CompletableFuture getWebMonitorBaseUrl() {
+   @VisibleForTesting
+   CompletableFuture getWebMonitorBaseUrl() {
return FutureUtils.orTimeout(
webMonitorLeaderRetriever.getLeaderFuture(),

restClusterClientConfiguration.getAwaitLeaderTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/47909f46/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
--
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e7f9bf9..e2daad6 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandl

[05/11] flink git commit: [FLINK-9173][REST] Improve client error message for parsing failures

2018-04-16 Thread chesnay
[FLINK-9173][REST] Improve client error message for parsing failures

- print parsing exception for expected type, not error
- add toString implemented to JsonResponse


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/50504ced
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/50504ced
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/50504ced

Branch: refs/heads/release-1.5
Commit: 50504ced6f162bd9247f8da49889ad2ea0183c0d
Parents: c6d45b9
Author: zentol 
Authored: Mon Apr 16 10:31:52 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:18:32 2018 +0200

--
 .../java/org/apache/flink/runtime/rest/RestClient.java  | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/50504ced/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 6319634..df97f20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -217,7 +217,7 @@ public class RestClient {
try {
P response = objectMapper.readValue(jsonParser, 
responseType);
responseFuture.complete(response);
-   } catch (IOException ioe) {
+   } catch (IOException originalException) {
// the received response did not matched the expected 
response type
 
// lets see if it is an ErrorResponse instead
@@ -231,7 +231,7 @@ public class RestClient {
responseFuture.completeExceptionally(
new RestClientException(
"Response was neither of the 
expected type(" + responseType + ") nor an error.",
-   jpe2,
+   originalException,

rawResponse.getHttpResponseStatus()));
}
}
@@ -328,5 +328,13 @@ public class RestClient {
public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}
+
+   @Override
+   public String toString() {
+   return "JsonResponse{" +
+   "json=" + json +
+   ", httpResponseStatus=" + httpResponseStatus +
+   '}';
+   }
}
 }



[08/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

2018-04-16 Thread chesnay
http://git-wip-us.apache.org/repos/asf/flink/blob/23d45436/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
new file mode 100644
index 000..4453ee2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Test base for handlers that extend {@link 
AbstractAggregatingMetricsHandler}.
+ */
+public abstract class AggregatingMetricsHandlerTestBase<
+   H extends AbstractAggregatingMetricsHandler,
+   P extends AbstractAggregatedMetricsParameters>
+   extends TestLogger {
+
+   private static final CompletableFuture TEST_REST_ADDRESS;
+   private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY;
+   private static final GatewayRetriever 
LEADER_RETRIEVER;
+   private static final Time TIMEOUT = Time.milliseconds(50);
+   private static final Map TEST_HEADERS = 
Collections.emptyMap();
+   private static final Executor EXECUTOR = TestingUtils.defaultExecutor();
+
+   static {
+   TEST_REST_ADDRESS = 
CompletableFuture.completedFuture("localhost:12345");
+
+   MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class);
+
+   LEADER_RETRIEVER = new GatewayRetriever() {
+   @Override
+   public CompletableFuture getFuture() 
{
+   return 
CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY);
+   }
+   };
+   }
+
+   private H handler;
+   private MetricStore store;
+   private Map pathParameters;
+
+   @Before
+   public void setUp() throws Exception {
+   MetricFetcher fetcher = new 
MetricFetcher(
+   mock(GatewayRetriever.class),
+   mock(MetricQueryServiceRetriever.class),
+   Executors.directExecutor(),
+   TestingUtils.TIMEOUT());
+   store = fetcher.getMetricStore();
+
+   Collection metricDumps = getMetricD

[03/11] flink git commit: [FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration

2018-04-16 Thread chesnay
[FLINK-8961][tests] Add MiniClusterResource#getClientConfiguration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b1f3ca3f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b1f3ca3f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b1f3ca3f

Branch: refs/heads/master
Commit: b1f3ca3f61fa1f0a906804e42844c2c08bd3f5cc
Parents: 8eb4604
Author: zentol 
Authored: Thu Apr 5 11:00:45 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../apache/flink/test/util/MiniClusterResource.java  | 15 +++
 1 file changed, 15 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/b1f3ca3f/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
--
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 8a05750..531a3c7 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -25,8 +25,10 @@ import 
org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
+import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.minicluster.JobExecutorService;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
@@ -67,6 +69,8 @@ public class MiniClusterResource extends ExternalResource {
 
private ClusterClient clusterClient;
 
+   private Configuration restClusterClientConfig;
+
private int numberSlots = -1;
 
private TestEnvironment executionEnvironment;
@@ -117,6 +121,10 @@ public class MiniClusterResource extends ExternalResource {
return clusterClient;
}
 
+   public Configuration getClientConfiguration() {
+   return restClusterClientConfig;
+   }
+
public TestEnvironment getTestEnvironment() {
return executionEnvironment;
}
@@ -194,6 +202,9 @@ public class MiniClusterResource extends ExternalResource {
if (enableClusterClient) {
clusterClient = new 
StandaloneClusterClient(configuration, 
flinkMiniCluster.highAvailabilityServices(), true);
}
+   Configuration restClientConfig = new Configuration();
+   restClientConfig.setInteger(JobManagerOptions.PORT, 
flinkMiniCluster.getLeaderRPCPort());
+   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
}
 
private void startMiniCluster() throws Exception {
@@ -229,6 +240,10 @@ public class MiniClusterResource extends ExternalResource {
if (enableClusterClient) {
clusterClient = new MiniClusterClient(configuration, 
miniCluster);
}
+   Configuration restClientConfig = new Configuration();
+   restClientConfig.setString(JobManagerOptions.ADDRESS, 
miniCluster.getRestAddress().getHost());
+   restClientConfig.setInteger(RestOptions.REST_PORT, 
miniCluster.getRestAddress().getPort());
+   this.restClusterClientConfig = new 
UnmodifiableConfiguration(restClientConfig);
}
 
/**



[10/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

2018-04-16 Thread chesnay
[FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

This closes #5805.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c0410d80
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c0410d80
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c0410d80

Branch: refs/heads/master
Commit: c0410d801e406e77b1e6e7134224f7946906a49f
Parents: 4645d3c
Author: zentol 
Authored: Wed Mar 28 12:52:07 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:54 2018 +0200

--
 .../AbstractAggregatingMetricsHandler.java  | 300 ++
 .../metrics/AggregatingJobsMetricsHandler.java  |  77 
 .../AggregatingSubtasksMetricsHandler.java  | 119 ++
 .../AggregatingTaskManagersMetricsHandler.java  |  77 
 .../handler/job/metrics/DoubleAccumulator.java  | 257 
 .../AbstractAggregatedMetricsHeaders.java   |  50 +++
 .../AbstractAggregatedMetricsParameters.java|  48 +++
 .../AggregateTaskManagerMetricsParameters.java  |  38 ++
 .../metrics/AggregatedJobMetricsHeaders.java|  44 +++
 .../metrics/AggregatedJobMetricsParameters.java |  39 ++
 .../messages/job/metrics/AggregatedMetric.java  | 118 ++
 .../metrics/AggregatedMetricsResponseBody.java  | 112 ++
 .../AggregatedSubtaskMetricsHeaders.java|  47 +++
 .../AggregatedSubtaskMetricsParameters.java |  51 +++
 .../AggregatedTaskManagerMetricsHeaders.java|  44 +++
 .../job/metrics/JobsFilterQueryParameter.java   |  48 +++
 .../metrics/MetricsAggregationParameter.java|  58 +++
 .../metrics/SubtasksFilterQueryParameter.java   |  41 ++
 .../TaskManagersFilterQueryParameter.java   |  42 ++
 .../runtime/webmonitor/WebMonitorEndpoint.java  |  33 ++
 .../AggregatingJobsMetricsHandlerTest.java  |  81 
 .../AggregatingMetricsHandlerTestBase.java  | 389 +++
 .../AggregatingSubtasksMetricsHandlerTest.java  |  93 +
 ...gregatingTaskManagersMetricsHandlerTest.java |  82 
 24 files changed, 2288 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
new file mode 100644
index 000..338bb46
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/job/metrics/AbstractAggregatingMetricsHandler.java
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsHeaders;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsAggregationParameter;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.MetricsFilterParameter;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.netty4.io.netty

[02/11] flink git commit: [FLINK-8961][tests] Port JobRetrievalITCase to flip6

2018-04-16 Thread chesnay
[FLINK-8961][tests] Port JobRetrievalITCase to flip6

This closes #5730.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2266eb01
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2266eb01
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2266eb01

Branch: refs/heads/master
Commit: 2266eb010b377450aa1f01ec589fe8758e9a0c6d
Parents: b1f3ca3
Author: zentol 
Authored: Tue Mar 20 15:19:47 2018 +0100
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../test/example/client/JobRetrievalITCase.java | 121 +++---
 .../client/LegacyJobRetrievalITCase.java| 162 +++
 2 files changed, 224 insertions(+), 59 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2266eb01/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
--
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
index 57198c0..6b747e0 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/example/client/JobRetrievalITCase.java
@@ -21,30 +21,27 @@ package org.apache.flink.test.example.client;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.deployment.StandaloneClusterId;
-import org.apache.flink.client.program.ClusterClient;
-import org.apache.flink.client.program.StandaloneClusterClient;
+import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.runtime.client.JobRetrievalException;
+import org.apache.flink.configuration.RestOptions;
+import org.apache.flink.core.testutils.CheckedThread;
 import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
-import org.apache.flink.runtime.minicluster.FlinkMiniCluster;
-import org.apache.flink.runtime.testingUtils.TestingCluster;
-import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages;
+import org.apache.flink.test.util.MiniClusterResource;
+import org.apache.flink.testutils.category.New;
+import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.TestLogger;
 
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.testkit.JavaTestKit;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.ClassRule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 
+import java.util.Optional;
 import java.util.concurrent.Semaphore;
-import java.util.concurrent.atomic.AtomicReference;
-
-import scala.collection.Seq;
 
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
@@ -52,23 +49,41 @@ import static org.junit.Assert.fail;
 /**
  * Tests retrieval of a job from a running Flink cluster.
  */
+@Category(New.class)
 public class JobRetrievalITCase extends TestLogger {
 
private static final Semaphore lock = new Semaphore(1);
 
-   private static FlinkMiniCluster cluster;
-
-   @BeforeClass
-   public static void before() {
-   Configuration configuration = new Configuration();
-   cluster = new TestingCluster(configuration, false);
-   cluster.start();
+   @ClassRule
+   public static final MiniClusterResource CLUSTER = new 
MiniClusterResource(
+   new MiniClusterResource.MiniClusterResourceConfiguration(
+   new Configuration(),
+   1,
+   4
+   ),
+   MiniClusterResource.MiniClusterType.NEW
+   );
+
+   private RestClusterClient client;
+
+   @Before
+   public void setUp() throws Exception {
+   final Configuration clientConfig = new Configuration();
+   clientConfig.setInteger(RestOptions.RETRY_MAX_ATTEMPTS, 0);
+   clientConfig.setLong(RestOptions.RETRY_DELAY, 0);
+   clientConfig.addAll(CLUSTER.getClientConfiguration());
+
+   client = new RestClusterClient<>(
+   clientConfig,
+   StandaloneClusterId.getInstance()
+   );
}
 
-   @AfterClass
-   public static void after() {
-   cluster.stop();
-   cluster = null;
+   @After
+   public void tearDown() {
+   if (client != null) {
+   client.shutdow

[06/11] flink git commit: [FLINK-9173][REST] Improve client error message for parsing failures

2018-04-16 Thread chesnay
[FLINK-9173][REST] Improve client error message for parsing failures

- print parsing exception for expected type, not error
- add toString implemented to JsonResponse


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e95fa5a4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e95fa5a4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e95fa5a4

Branch: refs/heads/master
Commit: e95fa5a4a03fce74a76ac29395c8a5f69276
Parents: 185b904
Author: zentol 
Authored: Mon Apr 16 10:31:52 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../java/org/apache/flink/runtime/rest/RestClient.java  | 12 ++--
 1 file changed, 10 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/e95fa5a4/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
index 6319634..df97f20 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/RestClient.java
@@ -217,7 +217,7 @@ public class RestClient {
try {
P response = objectMapper.readValue(jsonParser, 
responseType);
responseFuture.complete(response);
-   } catch (IOException ioe) {
+   } catch (IOException originalException) {
// the received response did not matched the expected 
response type
 
// lets see if it is an ErrorResponse instead
@@ -231,7 +231,7 @@ public class RestClient {
responseFuture.completeExceptionally(
new RestClientException(
"Response was neither of the 
expected type(" + responseType + ") nor an error.",
-   jpe2,
+   originalException,

rawResponse.getHttpResponseStatus()));
}
}
@@ -328,5 +328,13 @@ public class RestClient {
public HttpResponseStatus getHttpResponseStatus() {
return httpResponseStatus;
}
+
+   @Override
+   public String toString() {
+   return "JsonResponse{" +
+   "json=" + json +
+   ", httpResponseStatus=" + httpResponseStatus +
+   '}';
+   }
}
 }



[09/11] flink git commit: [FLINK-8370][REST] Port AggregatingMetricsHandler to flip6

2018-04-16 Thread chesnay
http://git-wip-us.apache.org/repos/asf/flink/blob/c0410d80/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
new file mode 100644
index 000..4453ee2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/metrics/AggregatingMetricsHandlerTestBase.java
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.rest.handler.job.metrics;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.metrics.dump.MetricDump;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
+import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricStore;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AbstractAggregatedMetricsParameters;
+import org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetric;
+import 
org.apache.flink.runtime.rest.messages.job.metrics.AggregatedMetricsResponseBody;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.webmonitor.RestfulGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import 
org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.stream.Collectors;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.powermock.api.mockito.PowerMockito.mock;
+
+/**
+ * Test base for handlers that extend {@link 
AbstractAggregatingMetricsHandler}.
+ */
+public abstract class AggregatingMetricsHandlerTestBase<
+   H extends AbstractAggregatingMetricsHandler,
+   P extends AbstractAggregatedMetricsParameters>
+   extends TestLogger {
+
+   private static final CompletableFuture TEST_REST_ADDRESS;
+   private static final DispatcherGateway MOCK_DISPATCHER_GATEWAY;
+   private static final GatewayRetriever 
LEADER_RETRIEVER;
+   private static final Time TIMEOUT = Time.milliseconds(50);
+   private static final Map TEST_HEADERS = 
Collections.emptyMap();
+   private static final Executor EXECUTOR = TestingUtils.defaultExecutor();
+
+   static {
+   TEST_REST_ADDRESS = 
CompletableFuture.completedFuture("localhost:12345");
+
+   MOCK_DISPATCHER_GATEWAY = mock(DispatcherGateway.class);
+
+   LEADER_RETRIEVER = new GatewayRetriever() {
+   @Override
+   public CompletableFuture getFuture() 
{
+   return 
CompletableFuture.completedFuture(MOCK_DISPATCHER_GATEWAY);
+   }
+   };
+   }
+
+   private H handler;
+   private MetricStore store;
+   private Map pathParameters;
+
+   @Before
+   public void setUp() throws Exception {
+   MetricFetcher fetcher = new 
MetricFetcher(
+   mock(GatewayRetriever.class),
+   mock(MetricQueryServiceRetriever.class),
+   Executors.directExecutor(),
+   TestingUtils.TIMEOUT());
+   store = fetcher.getMetricStore();
+
+   Collection metricDumps = getMetricD

[04/11] flink git commit: [FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

2018-04-16 Thread chesnay
[FLINK-9156][REST][CLI] Update --jobmanager option logic for REST client

This closes #5838.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f0fa0b3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f0fa0b3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f0fa0b3

Branch: refs/heads/master
Commit: 4f0fa0b3f992da4474e2703a54a8445cf1e29856
Parents: e95fa5a
Author: zentol 
Authored: Wed Apr 11 12:48:51 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../apache/flink/client/cli/CliFrontend.java|  3 ++
 .../client/program/rest/RestClusterClient.java  |  3 +-
 .../program/rest/RestClusterClientTest.java | 35 
 3 files changed, 40 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4f0fa0b3/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index ce6556b..65f470b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -37,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.JobManagerOptions;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.optimizer.DataStatistics;
 import org.apache.flink.optimizer.Optimizer;
@@ -1141,6 +1142,8 @@ public class CliFrontend {
public static void setJobManagerAddressInConfig(Configuration config, 
InetSocketAddress address) {
config.setString(JobManagerOptions.ADDRESS, 
address.getHostString());
config.setInteger(JobManagerOptions.PORT, address.getPort());
+   config.setString(RestOptions.REST_ADDRESS, 
address.getHostString());
+   config.setInteger(RestOptions.REST_PORT, address.getPort());
}
 
public static List> 
loadCustomCommandLines(Configuration configuration, String 
configurationDirectory) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4f0fa0b3/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
--
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index a6f676e..3d50e93 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -719,7 +719,8 @@ public class RestClusterClient extends ClusterClient 
implements NewCluster
.orElse(false);
}
 
-   private CompletableFuture getWebMonitorBaseUrl() {
+   @VisibleForTesting
+   CompletableFuture getWebMonitorBaseUrl() {
return FutureUtils.orTimeout(
webMonitorLeaderRetriever.getLeaderFuture(),

restClusterClientConfiguration.getAwaitLeaderTimeout(),

http://git-wip-us.apache.org/repos/asf/flink/blob/4f0fa0b3/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
--
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
index e7f9bf9..e2daad6 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/rest/RestClusterClientTest.java
@@ -23,6 +23,8 @@ import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobSubmissionResult;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.client.cli.DefaultCLI;
+import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
 import org.apache.flink.client.deployment.StandaloneClusterId;
 import org.apache.flink.client.program.ProgramInvocationException;
 import org.apache.flink.configuration.Configuration;
@@ -100,6 +102,7 @@ import org.apache.flink.util.TestLogger;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandler;
 

[11/11] flink git commit: [FLINK-9177][docs] Update Mesos getting started link

2018-04-16 Thread chesnay
[FLINK-9177][docs] Update Mesos getting started link

This closes #5850.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4f73c8dc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4f73c8dc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4f73c8dc

Branch: refs/heads/master
Commit: 4f73c8dcbcb8c03339882cd3259549ea5b6e38cf
Parents: c0410d8
Author: Arunan Sugunakumar 
Authored: Mon Apr 16 07:26:34 2018 +
Committer: zentol 
Committed: Mon Apr 16 21:17:54 2018 +0200

--
 docs/ops/deployment/mesos.md | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4f73c8dc/docs/ops/deployment/mesos.md
--
diff --git a/docs/ops/deployment/mesos.md b/docs/ops/deployment/mesos.md
index 56f9d93..74bae39 100644
--- a/docs/ops/deployment/mesos.md
+++ b/docs/ops/deployment/mesos.md
@@ -101,7 +101,7 @@ You can also run Mesos without DC/OS.
 
 ### Installing Mesos
 
-Please follow the [instructions on how to setup Mesos on the official 
website](http://mesos.apache.org/documentation/latest/getting-started/).
+Please follow the [instructions on how to setup Mesos on the official 
website](http://mesos.apache.org/getting-started/).
 
 After installation you have to configure the set of master and agent nodes by 
creating the files `MESOS_HOME/etc/mesos/masters` and 
`MESOS_HOME/etc/mesos/slaves`.
 These files contain in each row a single hostname on which the respective 
component will be started (assuming SSH access to these nodes).



[01/11] flink git commit: [hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions

2018-04-16 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/master 185b904aa -> 27be32e8a


[hotfix][metrics] Allow QueryParameter converters to throw ConversionExceptions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4645d3c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4645d3c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4645d3c0

Branch: refs/heads/master
Commit: 4645d3c06736c7adf360a5380cd2a05b4eb752c9
Parents: 1cd7c42
Author: zentol 
Authored: Mon Mar 26 14:41:25 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../flink/runtime/rest/messages/MessageQueryParameter.java   | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4645d3c0/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
index 180f011..6799df1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageQueryParameter.java
@@ -35,7 +35,7 @@ public abstract class MessageQueryParameter extends 
MessageParameter>
}
 
@Override
-   public List convertFromString(String values) {
+   public List convertFromString(String values) throws 
ConversionException {
String[] splitValues = values.split(",");
List list = new ArrayList<>();
for (String value : splitValues) {
@@ -50,7 +50,7 @@ public abstract class MessageQueryParameter extends 
MessageParameter>
 * @param value string representation of parameter value
 * @return parameter value
 */
-   public abstract X convertStringToValue(String value);
+   public abstract X convertStringToValue(String value) throws 
ConversionException;
 
@Override
public String convertToString(List values) {



[07/11] flink git commit: [hotfix][tests] Add MCR constructor accepting configuration and type

2018-04-16 Thread chesnay
[hotfix][tests] Add MCR constructor accepting configuration and type


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8eb4604c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8eb4604c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8eb4604c

Branch: refs/heads/master
Commit: 8eb4604cd82a89258a6581ca8be4f419784e5096
Parents: 4f0fa0b
Author: zentol 
Authored: Thu Apr 5 11:00:23 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../java/org/apache/flink/test/util/MiniClusterResource.java   | 6 ++
 1 file changed, 6 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8eb4604c/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
--
diff --git 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
index 9b0ac77..8a05750 100644
--- 
a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
+++ 
b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/MiniClusterResource.java
@@ -77,6 +77,12 @@ public class MiniClusterResource extends ExternalResource {
 
public MiniClusterResource(
final MiniClusterResourceConfiguration 
miniClusterResourceConfiguration,
+   final MiniClusterType miniClusterType) {
+   this(miniClusterResourceConfiguration, miniClusterType, false);
+   }
+
+   public MiniClusterResource(
+   final MiniClusterResourceConfiguration 
miniClusterResourceConfiguration,
final boolean enableClusterClient) {
this(
miniClusterResourceConfiguration,



[08/11] flink git commit: [FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly logging message for web UI address

2018-04-16 Thread chesnay
[FLINK-9045][REST] Make createLocalEnvironmentWithWebUI more user-friendly 
logging message for web UI address

-add back known logging mesages about webUI address
-do not set random port in local stream environment

This closes #5814.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/27be32e8
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/27be32e8
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/27be32e8

Branch: refs/heads/master
Commit: 27be32e8a44e3afcce9a17e3b95767869f56ab61
Parents: 4f73c8d
Author: zentol 
Authored: Wed Apr 4 10:44:59 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:54 2018 +0200

--
 .../org/apache/flink/api/java/ExecutionEnvironment.java   |  6 ++
 .../flink/runtime/webmonitor/WebMonitorEndpoint.java  | 10 +-
 .../streaming/api/environment/LocalStreamEnvironment.java |  4 +++-
 .../api/environment/StreamExecutionEnvironment.java   |  6 ++
 4 files changed, 24 insertions(+), 2 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/27be32e8/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
--
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
index 1ce2221..3ea99ea 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java
@@ -52,6 +52,7 @@ import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.StringValue;
 import org.apache.flink.util.NumberSequenceIterator;
@@ -1125,6 +1126,11 @@ public abstract class ExecutionEnvironment {
 
conf.setBoolean(ConfigConstants.LOCAL_START_WEBSERVER, true);
 
+   if (!conf.contains(RestOptions.REST_PORT)) {
+   // explicitly set this option so that it's not set to 0 
later
+   conf.setInteger(RestOptions.REST_PORT, 
RestOptions.REST_PORT.defaultValue());
+   }
+
return createLocalEnvironment(conf, -1);
}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/27be32e8/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
index fb663ad..0ea7550 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
@@ -163,6 +163,8 @@ public class WebMonitorEndpoint 
extends RestServerEndp
 
private final FatalErrorHandler fatalErrorHandler;
 
+   private boolean hasWebUI = false;
+
public WebMonitorEndpoint(
RestServerEndpointConfiguration endpointConfiguration,
GatewayRetriever leaderRetriever,
@@ -606,7 +608,10 @@ public class WebMonitorEndpoint 
extends RestServerEndp

handlers.add(Tuple2.of(YarnStopJobTerminationHeaders.getInstance(), 
jobStopTerminationHandler));
 
optWebContent.ifPresent(
-   webContent -> 
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent)));
+   webContent -> {
+   
handlers.add(Tuple2.of(WebContentHandlerSpecification.getInstance(), 
webContent));
+   hasWebUI = true;
+   });
 
// load the log and stdout file handler for the main cluster 
component
final WebMonitorUtils.LogFileLocation logFileLocation = 
WebMonitorUtils.LogFileLocation.find(clusterConfiguration);
@@ -679,6 +684,9 @@ public class WebMonitorEndpoint 
extends RestServerEndp
@Override
public void startInternal() throws Exception {
leaderElectionService.start(this);
+   if (hasWebUI) {
+   log.info("Web frontend listening at {}.", 
getRestBaseUrl());
+   }
}
 
@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/27be32e8/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/envi

[05/11] flink git commit: [hotfix][metrics] Make MessageParameter constructor protected

2018-04-16 Thread chesnay
[hotfix][metrics] Make MessageParameter constructor protected


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1cd7c423
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1cd7c423
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1cd7c423

Branch: refs/heads/master
Commit: 1cd7c423d87cf680fb84af83ebfcc3246d079d1f
Parents: 2266eb0
Author: zentol 
Authored: Mon Mar 26 14:41:03 2018 +0200
Committer: zentol 
Committed: Mon Apr 16 21:17:53 2018 +0200

--
 .../org/apache/flink/runtime/rest/messages/MessageParameter.java   | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/1cd7c423/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
index a615e96..b8485e5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/MessageParameter.java
@@ -42,7 +42,7 @@ public abstract class MessageParameter {
private final String key;
private X value;
 
-   MessageParameter(String key, MessageParameterRequisiteness 
requisiteness) {
+   protected MessageParameter(String key, MessageParameterRequisiteness 
requisiteness) {
this.key = Preconditions.checkNotNull(key);
this.requisiteness = Preconditions.checkNotNull(requisiteness);
}



flink git commit: [FLINK-9145] [table] Clean up flink-table dependencies

2018-04-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/release-1.5 f97c0c15e -> c6d45b922


[FLINK-9145] [table] Clean up flink-table dependencies

This closes #5853.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c6d45b92
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c6d45b92
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c6d45b92

Branch: refs/heads/release-1.5
Commit: c6d45b9225987537493472f20e933a81b63c9cde
Parents: f97c0c1
Author: Timo Walther 
Authored: Mon Apr 16 13:02:22 2018 +0200
Committer: Timo Walther 
Committed: Mon Apr 16 18:11:58 2018 +0200

--
 flink-libraries/flink-sql-client/pom.xml | 17 +++
 flink-libraries/flink-table/pom.xml  | 67 ++-
 2 files changed, 83 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c6d45b92/flink-libraries/flink-sql-client/pom.xml
--
diff --git a/flink-libraries/flink-sql-client/pom.xml 
b/flink-libraries/flink-sql-client/pom.xml
index dcc2743..7349ba4 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -192,6 +192,23 @@ under the License.



+
+   
+   
+   org.apache.maven.plugins
+   maven-enforcer-plugin
+   
+   
+   dependency-convergence
+   
+   enforce
+   
+   
+   false
+   
+   
+   
+   


 

http://git-wip-us.apache.org/repos/asf/flink/blob/c6d45b92/flink-libraries/flink-table/pom.xml
--
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index 9ccc70b..73be397 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -32,6 +32,41 @@ under the License.
 
jar
 
+   
+   
+   
+   
+   com.google.guava
+   guava
+   19.0
+   
+   
+   
+   commons-beanutils
+   commons-beanutils
+   1.8.3
+   
+   
+   
+   org.codehaus.janino
+   commons-compiler
+   3.0.7
+   
+   
+   
+   commons-lang
+   commons-lang
+   2.6
+   
+   
+   
+   org.codehaus.janino
+   janino
+   3.0.7
+   
+   
+   
+

 

@@ -49,27 +84,38 @@ under the License.
provided

 
+   

commons-configuration
commons-configuration

 
+   

commons-codec
commons-codec

 
+   
+   
+   org.apache.commons
+   commons-lang3
+   
+
+   

org.codehaus.janino
janino
-   3.0.7

 
+   

org.apache.calcite
calcite-core
+   
1.16.0

+   


org.apache.calcite.avatica
avatica-metrics
@@ -105,6 +151,7 @@ under the License.


 
+   

org.reflections
 

flink git commit: [FLINK-9145] [table] Clean up flink-table dependencies

2018-04-16 Thread twalthr
Repository: flink
Updated Branches:
  refs/heads/master 71c3cd278 -> 185b904aa


[FLINK-9145] [table] Clean up flink-table dependencies

This closes #5853.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/185b904a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/185b904a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/185b904a

Branch: refs/heads/master
Commit: 185b904aa82dd99a178de0688fa67afba0edf679
Parents: 71c3cd2
Author: Timo Walther 
Authored: Mon Apr 16 13:02:22 2018 +0200
Committer: Timo Walther 
Committed: Mon Apr 16 18:09:41 2018 +0200

--
 flink-libraries/flink-sql-client/pom.xml | 17 +++
 flink-libraries/flink-table/pom.xml  | 67 ++-
 2 files changed, 83 insertions(+), 1 deletion(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/185b904a/flink-libraries/flink-sql-client/pom.xml
--
diff --git a/flink-libraries/flink-sql-client/pom.xml 
b/flink-libraries/flink-sql-client/pom.xml
index 6bcfc13..3763bf0 100644
--- a/flink-libraries/flink-sql-client/pom.xml
+++ b/flink-libraries/flink-sql-client/pom.xml
@@ -192,6 +192,23 @@ under the License.



+
+   
+   
+   org.apache.maven.plugins
+   maven-enforcer-plugin
+   
+   
+   dependency-convergence
+   
+   enforce
+   
+   
+   false
+   
+   
+   
+   


 

http://git-wip-us.apache.org/repos/asf/flink/blob/185b904a/flink-libraries/flink-table/pom.xml
--
diff --git a/flink-libraries/flink-table/pom.xml 
b/flink-libraries/flink-table/pom.xml
index d6c146a..acffb2b 100644
--- a/flink-libraries/flink-table/pom.xml
+++ b/flink-libraries/flink-table/pom.xml
@@ -32,6 +32,41 @@ under the License.
 
jar
 
+   
+   
+   
+   
+   com.google.guava
+   guava
+   19.0
+   
+   
+   
+   commons-beanutils
+   commons-beanutils
+   1.8.3
+   
+   
+   
+   org.codehaus.janino
+   commons-compiler
+   3.0.7
+   
+   
+   
+   commons-lang
+   commons-lang
+   2.6
+   
+   
+   
+   org.codehaus.janino
+   janino
+   3.0.7
+   
+   
+   
+

 

@@ -49,27 +84,38 @@ under the License.
provided

 
+   

commons-configuration
commons-configuration

 
+   

commons-codec
commons-codec

 
+   
+   
+   org.apache.commons
+   commons-lang3
+   
+
+   

org.codehaus.janino
janino
-   3.0.7

 
+   

org.apache.calcite
calcite-core
+   
1.16.0

+   


org.apache.calcite.avatica
avatica-metrics
@@ -105,6 +151,7 @@ under the License.


 
+   

org.reflections