[flink] 02/05: [FLINK-12325][metrics] Add counter/gauge tests for StatsD

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8cc170c9471beac271b6f5e22f18d586dc33765
Author: Richard Deurwaarder 
AuthorDate: Thu May 2 12:07:08 2019 +0200

[FLINK-12325][metrics] Add counter/gauge tests for StatsD
---
 .../flink/metrics/statsd/StatsDReporterTest.java   | 23 ++
 1 file changed, 23 insertions(+)

diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index a0b853f..d447a4c 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
@@ -163,6 +164,28 @@ public class StatsDReporterTest extends TestLogger {
testMetricAndAssert(new TestMeter(), "metric", expectedLines);
}
 
+   /**
+* Tests that counter are properly reported via the StatsD reporter.
+*/
+   @Test
+   public void testStatsDCountersReporting() throws Exception {
+   Set expectedLines = new HashSet<>(2);
+   expectedLines.add("metric:100|g");
+
+   Counter counter = new SimpleCounter();
+   counter.inc(100);
+
+   testMetricAndAssert(counter, "metric", expectedLines);
+   }
+
+   @Test
+   public void testStatsDGaugesReporting() throws Exception {
+   Set expectedLines = new HashSet<>(2);
+   expectedLines.add("metric:75|g");
+
+   testMetricAndAssert((Gauge) () -> 75, "metric", expectedLines);
+   }
+
private void testMetricAndAssert(Metric metric, String metricName, 
Set expectation) throws Exception {
StatsDReporter reporter = null;
DatagramSocketReceiver receiver = null;



[flink] 05/05: [FLINK-12325][metrics] StatsDReporter properly handles negative values

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 92b1a66a63aa10fc58c9f2ac4baca859437db40c
Author: Richard Deurwaarder 
AuthorDate: Thu May 2 12:47:36 2019 +0200

[FLINK-12325][metrics] StatsDReporter properly handles negative values
---
 .../flink/metrics/statsd/StatsDReporter.java   | 59 --
 .../flink/metrics/statsd/StatsDReporterTest.java   | 59 ++
 2 files changed, 102 insertions(+), 16 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
index 527f9c1..29fbeb9 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java
@@ -127,14 +127,20 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
// 

 
private void reportCounter(final String name, final Counter counter) {
-   send(name, String.valueOf(counter.getCount()));
+   send(name, counter.getCount());
}
 
private void reportGauge(final String name, final Gauge gauge) {
Object value = gauge.getValue();
-   if (value != null) {
-   send(name, value.toString());
+   if (value == null) {
+   return;
}
+
+   if (value instanceof Number) {
+   send(numberIsNegative((Number) value), name, 
value.toString());
+   }
+
+   send(name, value.toString());
}
 
private void reportHistogram(final String name, final Histogram 
histogram) {
@@ -143,25 +149,25 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
HistogramStatistics statistics = 
histogram.getStatistics();
 
if (statistics != null) {
-   send(prefix(name, "count"), 
String.valueOf(histogram.getCount()));
-   send(prefix(name, "max"), 
String.valueOf(statistics.getMax()));
-   send(prefix(name, "min"), 
String.valueOf(statistics.getMin()));
-   send(prefix(name, "mean"), 
String.valueOf(statistics.getMean()));
-   send(prefix(name, "stddev"), 
String.valueOf(statistics.getStdDev()));
-   send(prefix(name, "p50"), 
String.valueOf(statistics.getQuantile(0.5)));
-   send(prefix(name, "p75"), 
String.valueOf(statistics.getQuantile(0.75)));
-   send(prefix(name, "p95"), 
String.valueOf(statistics.getQuantile(0.95)));
-   send(prefix(name, "p98"), 
String.valueOf(statistics.getQuantile(0.98)));
-   send(prefix(name, "p99"), 
String.valueOf(statistics.getQuantile(0.99)));
-   send(prefix(name, "p999"), 
String.valueOf(statistics.getQuantile(0.999)));
+   send(prefix(name, "count"), 
histogram.getCount());
+   send(prefix(name, "max"), statistics.getMax());
+   send(prefix(name, "min"), statistics.getMin());
+   send(prefix(name, "mean"), 
statistics.getMean());
+   send(prefix(name, "stddev"), 
statistics.getStdDev());
+   send(prefix(name, "p50"), 
statistics.getQuantile(0.5));
+   send(prefix(name, "p75"), 
statistics.getQuantile(0.75));
+   send(prefix(name, "p95"), 
statistics.getQuantile(0.95));
+   send(prefix(name, "p98"), 
statistics.getQuantile(0.98));
+   send(prefix(name, "p99"), 
statistics.getQuantile(0.99));
+   send(prefix(name, "p999"), 
statistics.getQuantile(0.999));
}
}
}
 
private void reportMeter(final String name, final Meter meter) {
if (meter != null) {
-   send(prefix(name, "rate"), 
String.valueOf(meter.getRate()));
-   send(prefix(name, "count"), 
String.valueOf(meter.getCount()));
+   send(prefix(name, "rate"), meter.getRate());
+   send(prefix(name, "count"), meter.getCount());
}
}
 
@@ -179,6 +185,23 @@ public class StatsDReporter extends AbstractReporter 
implements Scheduled {
}

[flink] 01/05: [FLINK-12325][metrics] Refactor StatsD tests

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b806e7eb75e40729f1565ecc65bb632fd4eaf28
Author: Richard Deurwaarder 
AuthorDate: Thu May 2 12:02:48 2019 +0200

[FLINK-12325][metrics] Refactor StatsD tests
---
 .../flink/metrics/statsd/StatsDReporterTest.java   | 124 ++---
 1 file changed, 35 insertions(+), 89 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index 68b90ce..a0b853f 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -25,8 +25,11 @@ import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Histogram;
 import org.apache.flink.metrics.HistogramStatistics;
+import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -132,73 +135,20 @@ public class StatsDReporterTest extends TestLogger {
 */
@Test
public void testStatsDHistogramReporting() throws Exception {
-   MetricRegistryImpl registry = null;
-   DatagramSocketReceiver receiver = null;
-   Thread receiverThread = null;
-   long timeout = 5000;
-   long joinTimeout = 3;
-
-   String histogramName = "histogram";
-
-   try {
-   receiver = new DatagramSocketReceiver();
-
-   receiverThread = new Thread(receiver);
-
-   receiverThread.start();
-
-   int port = receiver.getPort();
-
-   MetricConfig config = new MetricConfig();
-   
config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 
SECONDS");
-   config.setProperty("host", "localhost");
-   config.setProperty("port", String.valueOf(port));
-
-   registry = new MetricRegistryImpl(
-   
MetricRegistryConfiguration.defaultMetricRegistryConfiguration(),
-   
Collections.singletonList(ReporterSetup.forReporter("test", config, new 
StatsDReporter(;
-
-   TaskManagerMetricGroup metricGroup = new 
TaskManagerMetricGroup(registry, "localhost", "tmId");
-
-   TestingHistogram histogram = new TestingHistogram();
-
-   metricGroup.histogram(histogramName, histogram);
-
-   receiver.waitUntilNumLines(11, timeout);
-
-   Set lines = receiver.getLines();
-
-   String prefix = 
metricGroup.getMetricIdentifier(histogramName);
-
-   Set expectedLines = new HashSet<>();
-
-   expectedLines.add(prefix + ".count:1|g");
-   expectedLines.add(prefix + ".mean:3.0|g");
-   expectedLines.add(prefix + ".min:6|g");
-   expectedLines.add(prefix + ".max:5|g");
-   expectedLines.add(prefix + ".stddev:4.0|g");
-   expectedLines.add(prefix + ".p75:0.75|g");
-   expectedLines.add(prefix + ".p98:0.98|g");
-   expectedLines.add(prefix + ".p99:0.99|g");
-   expectedLines.add(prefix + ".p999:0.999|g");
-   expectedLines.add(prefix + ".p95:0.95|g");
-   expectedLines.add(prefix + ".p50:0.5|g");
-
-   assertEquals(expectedLines, lines);
-
-   } finally {
-   if (registry != null) {
-   registry.shutdown().get();
-   }
-
-   if (receiver != null) {
-   receiver.stop();
-   }
-
-   if (receiverThread != null) {
-   receiverThread.join(joinTimeout);
-   }
-   }
+   Set expectedLines = new HashSet<>(6);
+   expectedLines.add("metric.count:1|g");
+   expectedLines.add("metric.mean:3.0|g");
+   expectedLines.add("metric.min:6|g");
+   expectedLines.add("metric.max:5|g");
+   ex

[flink] branch master updated (c954780 -> 92b1a66)

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c954780  [FLINK-12417][table] Unify ReadableCatalog and 
ReadableWritableCatalog interfaces to Catalog interface
 new 7b806e7  [FLINK-12325][metrics] Refactor StatsD tests
 new a8cc170  [FLINK-12325][metrics] Add counter/gauge tests for StatsD
 new 7065b7f  [FLINK-12325][metrics] Extend test metric implementations
 new c636c53  [FLINK-12325][metrics] Migrate StatsD test to test metric 
implementations
 new 92b1a66  [FLINK-12325][metrics] StatsDReporter properly handles 
negative values

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../util/{TestMeter.java => TestCounter.java}  |  32 ++-
 .../apache/flink/metrics/util/TestHistogram.java   |  43 +++-
 .../org/apache/flink/metrics/util/TestMeter.java   |  15 +-
 .../flink/metrics/statsd/StatsDReporter.java   |  59 +++--
 .../flink/metrics/statsd/StatsDReporterTest.java   | 246 +
 5 files changed, 225 insertions(+), 170 deletions(-)
 copy 
flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/{TestMeter.java
 => TestCounter.java} (66%)



[flink] 04/05: [FLINK-12325][metrics] Migrate StatsD test to test metric implementations

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c636c53421ff00749162cf34c29ca4177deec77d
Author: Richard Deurwaarder 
AuthorDate: Thu May 2 12:47:14 2019 +0200

[FLINK-12325][metrics] Migrate StatsD test to test metric implementations
---
 .../flink/metrics/statsd/StatsDReporterTest.java   | 72 +++---
 1 file changed, 8 insertions(+), 64 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
index d447a4c..61bf07d 100644
--- 
a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
+++ 
b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java
@@ -24,14 +24,14 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
 import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
-import org.apache.flink.metrics.Histogram;
-import org.apache.flink.metrics.HistogramStatistics;
 import org.apache.flink.metrics.Metric;
 import org.apache.flink.metrics.MetricConfig;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.SimpleCounter;
 import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
 import org.apache.flink.metrics.reporter.MetricReporter;
+import org.apache.flink.metrics.util.TestCounter;
+import org.apache.flink.metrics.util.TestHistogram;
 import org.apache.flink.metrics.util.TestMeter;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.metrics.MetricRegistryConfiguration;
@@ -138,10 +138,10 @@ public class StatsDReporterTest extends TestLogger {
public void testStatsDHistogramReporting() throws Exception {
Set expectedLines = new HashSet<>(6);
expectedLines.add("metric.count:1|g");
-   expectedLines.add("metric.mean:3.0|g");
-   expectedLines.add("metric.min:6|g");
-   expectedLines.add("metric.max:5|g");
-   expectedLines.add("metric.stddev:4.0|g");
+   expectedLines.add("metric.mean:4.0|g");
+   expectedLines.add("metric.min:7|g");
+   expectedLines.add("metric.max:6|g");
+   expectedLines.add("metric.stddev:5.0|g");
expectedLines.add("metric.p75:0.75|g");
expectedLines.add("metric.p98:0.98|g");
expectedLines.add("metric.p99:0.99|g");
@@ -149,7 +149,7 @@ public class StatsDReporterTest extends TestLogger {
expectedLines.add("metric.p95:0.95|g");
expectedLines.add("metric.p50:0.5|g");
 
-   testMetricAndAssert(new TestingHistogram(), "metric", 
expectedLines);
+   testMetricAndAssert(new TestHistogram(), "metric", 
expectedLines);
}
 
/**
@@ -172,10 +172,7 @@ public class StatsDReporterTest extends TestLogger {
Set expectedLines = new HashSet<>(2);
expectedLines.add("metric:100|g");
 
-   Counter counter = new SimpleCounter();
-   counter.inc(100);
-
-   testMetricAndAssert(counter, "metric", expectedLines);
+   testMetricAndAssert(new TestCounter(100), "metric", 
expectedLines);
}
 
@Test
@@ -245,59 +242,6 @@ public class StatsDReporterTest extends TestLogger {
}
}
 
-   private static class TestingHistogram implements Histogram {
-
-   @Override
-   public void update(long value) {
-
-   }
-
-   @Override
-   public long getCount() {
-   return 1;
-   }
-
-   @Override
-   public HistogramStatistics getStatistics() {
-   return new HistogramStatistics() {
-   @Override
-   public double getQuantile(double quantile) {
-   return quantile;
-   }
-
-   @Override
-   public long[] getValues() {
-   return new long[0];
-   }
-
-   @Override
-   public int size() {
-   return 2;
-   }
-
-   @Override
-   public double getMean() {
-   return 3;
-   }
-
-   @Override
-   public double getStdDev() {
-   

[flink] 03/05: [FLINK-12325][metrics] Extend test metric implementations

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7065b7f3ce44c3d5080f33df82095055a4540411
Author: Richard Deurwaarder 
AuthorDate: Thu May 2 12:39:08 2019 +0200

[FLINK-12325][metrics] Extend test metric implementations
---
 .../util/{TestMeter.java => TestCounter.java}  | 32 
 .../apache/flink/metrics/util/TestHistogram.java   | 43 ++
 .../org/apache/flink/metrics/util/TestMeter.java   | 15 +++-
 3 files changed, 73 insertions(+), 17 deletions(-)

diff --git 
a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestMeter.java
 
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestCounter.java
similarity index 66%
copy from 
flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestMeter.java
copy to 
flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestCounter.java
index b1ec3a3..1f47899 100644
--- 
a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestMeter.java
+++ 
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestCounter.java
@@ -18,28 +18,44 @@
 
 package org.apache.flink.metrics.util;
 
-import org.apache.flink.metrics.Meter;
+import org.apache.flink.metrics.Counter;
 
 /**
- * A dummy {@link Meter} implementation.
+ * A dummy {@link Counter} implementation.
  */
-public class TestMeter implements Meter {
+public class TestCounter implements Counter {
+   private long countValue;
+
+   public TestCounter() {
+   this.countValue = 0;
+   }
+
+   public TestCounter(long countValue) {
+   this.countValue = countValue;
+   }
+
+   @Override
+   public void inc() {
+   countValue++;
+   }
 
@Override
-   public void markEvent() {
+   public void inc(long n) {
+   countValue += n;
}
 
@Override
-   public void markEvent(long n) {
+   public void dec() {
+   countValue--;
}
 
@Override
-   public double getRate() {
-   return 5;
+   public void dec(long n) {
+   countValue -= n;
}
 
@Override
public long getCount() {
-   return 100L;
+   return countValue;
}
 }
diff --git 
a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java
 
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java
index 6df2e92..06dbd39 100644
--- 
a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java
+++ 
b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java
@@ -25,14 +25,19 @@ import org.apache.flink.metrics.HistogramStatistics;
  * Stateless test histogram for which all methods return a static value.
  */
 public class TestHistogram implements Histogram {
-
+   private long count = 1;
+   private int size = 3;
+   private double mean = 4;
+   private double stdDev = 5;
+   private long max = 6;
+   private long min = 7;
@Override
public void update(long value) {
}
 
@Override
public long getCount() {
-   return 1;
+   return count;
}
 
@Override
@@ -50,28 +55,52 @@ public class TestHistogram implements Histogram {
 
@Override
public int size() {
-   return 3;
+   return size;
}
 
@Override
public double getMean() {
-   return 4;
+   return mean;
}
 
@Override
public double getStdDev() {
-   return 5;
+   return stdDev;
}
 
@Override
public long getMax() {
-   return 6;
+   return max;
}
 
@Override
public long getMin() {
-   return 7;
+   return min;
}
};
}
+
+   public void setSize(int size) {
+   this.size = size;
+   }
+
+   public void setMean(double mean) {
+   this.mean = mean;
+   }
+
+   public void setStdDev(double stdDev) {
+   this.stdDev = stdDev;
+   }
+
+   public void setMax(long max) {
+   this.max = max;
+   }
+
+   public void setMin(long m

[flink-web] branch asf-site updated: Rebuild website

2019-05-08 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new fa63dbb  Rebuild website
fa63dbb is described below

commit fa63dbb0edc0295f5f3f0ba2ac5477b61f8c593a
Author: sunjincheng121 
AuthorDate: Thu May 9 08:26:29 2019 +0800

Rebuild website
---
 content/roadmap.html| 9 +
 content/zh/roadmap.html | 9 +
 2 files changed, 18 insertions(+)

diff --git a/content/roadmap.html b/content/roadmap.html
index adba88f..7e450b4 100644
--- a/content/roadmap.html
+++ b/content/roadmap.html
@@ -338,6 +338,15 @@ the catalogs (https://issues.apache.org/jira/browse/FLINK-10232";>FLINK-
 There is a broad effort to integrate Flink with the Hive Ecosystem, 
including
 metastore and Hive UDF support https://issues.apache.org/jira/browse/FLINK-10556";>FLINK-10556.
 
+There is also a big effort to support Python for Table API https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API";>FLIP-38.
+We will divide the work into following stages:
+
+
+  Translate Python Table API queries without UDFs to Java and run them 
completely in Java for the first step.
+  Add support for User-defined functions(Scalar Function/Table 
Function/Aggregate Function) in the second step.
+  Integrating Pandas as the final effort, i.e., functions in Pandas can be 
used in Python Table API directly.
+
+
 Connectors & Formats
 
 Support for additional connectors and formats is a continuous process.
diff --git a/content/zh/roadmap.html b/content/zh/roadmap.html
index 7419bfd..182ac2b 100644
--- a/content/zh/roadmap.html
+++ b/content/zh/roadmap.html
@@ -278,6 +278,15 @@ under the License.
 
 还有一个巨大的工作是将 Flink 与 Hive 生态系统集成。包括 Metastore 和 Hive UDF 支持 https://issues.apache.org/jira/browse/FLINK-10556";>FLINK-10556。
 
+社区在Table API支持Python方面也做出了很多的努力 https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API";>FLIP-38。
+我们将工作分为以下几个阶段:
+
+
+  首先以将Python Table API 直译为Java Table 
API的方式支持用户编写没有UDFs(标量函数/表值函数/聚合函数)的Python Table API程序。
+  增加对UDFs(标量函数/表值函数/聚合函数)在Python Table API的支持。
+  最后将Pandas与Python Table API进行集成,Pandas中的函数可以在Python Table API中直接使用。
+
+
 Connectors & Formats
 
 支持额外的 connectors 和 formats 是一个持续的过程。



[flink-web] branch asf-site updated: Add python table api section to the roadmap.

2019-05-08 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch asf-site
in repository https://gitbox.apache.org/repos/asf/flink-web.git


The following commit(s) were added to refs/heads/asf-site by this push:
 new b20d169  Add python table api section to the roadmap.
b20d169 is described below

commit b20d1696b6c5fdca23ee9db88408fce278d78f34
Author: sunjincheng121 
AuthorDate: Tue May 7 07:51:25 2019 +0800

Add python table api section to the roadmap.

This closes #204
---
 roadmap.md| 7 +++
 roadmap.zh.md | 7 +++
 2 files changed, 14 insertions(+)

diff --git a/roadmap.md b/roadmap.md
index 2ef95c2..803363a 100644
--- a/roadmap.md
+++ b/roadmap.md
@@ -154,6 +154,13 @@ the catalogs 
([FLINK-10232](https://issues.apache.org/jira/browse/FLINK-10232)).
 There is a broad effort to integrate Flink with the Hive Ecosystem, including
 metastore and Hive UDF support 
[FLINK-10556](https://issues.apache.org/jira/browse/FLINK-10556).
 
+There is also a big effort to support Python for Table API 
[FLIP-38](https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API).
+We will divide the work into following stages:
+
+- Translate Python Table API queries without UDFs to Java and run them 
completely in Java for the first step.
+- Add support for User-defined functions(Scalar Function/Table 
Function/Aggregate Function) in the second step.
+- Integrating Pandas as the final effort, i.e., functions in Pandas can be 
used in Python Table API directly.
+
 # Connectors & Formats
 
 Support for additional connectors and formats is a continuous process.
diff --git a/roadmap.zh.md b/roadmap.zh.md
index 0e2f9ca..b3a01ec 100644
--- a/roadmap.zh.md
+++ b/roadmap.zh.md
@@ -100,6 +100,13 @@ Flink Web UI 正在移植到更新的框架中并获得其他功能并更好地
 
 还有一个巨大的工作是将 Flink 与 Hive 生态系统集成。包括 Metastore 和 Hive UDF 支持 
[FLINK-10556](https://issues.apache.org/jira/browse/FLINK-10556)。
 
+社区在Table API支持Python方面也做出了很多的努力 
[FLIP-38](https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API)。
+我们将工作分为以下几个阶段:
+
+- 首先以将Python Table API 直译为Java Table API的方式支持用户编写没有UDFs(标量函数/表值函数/聚合函数)的Python 
Table API程序。
+- 增加对UDFs(标量函数/表值函数/聚合函数)在Python Table API的支持。
+- 最后将Pandas与Python Table API进行集成,Pandas中的函数可以在Python Table API中直接使用。
+
 # Connectors & Formats
 
 支持额外的 connectors 和 formats 是一个持续的过程。



[flink] branch master updated: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface

2019-05-08 Thread bli
This is an automated email from the ASF dual-hosted git repository.

bli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new c954780  [FLINK-12417][table] Unify ReadableCatalog and 
ReadableWritableCatalog interfaces to Catalog interface
c954780 is described below

commit c954780c9fb2c0cc5b3730caff7e99bacbd534ba
Author: bowen.li 
AuthorDate: Tue May 7 12:15:48 2019 -0700

[FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog 
interfaces to Catalog interface

This PR unifies ReadableCatalog and ReadableWritableCatalog interfaces to 
Catalog interface to simplify the architecture and management of catalogs.

This closes #8365.
---
 .../flink/table/catalog/hive/HiveCatalogBase.java  |   4 +-
 .../table/catalog/GenericInMemoryCatalog.java  |   2 +-
 .../{ReadableWritableCatalog.java => Catalog.java} | 239 -
 .../flink/table/catalog/CatalogTestBase.java   |   2 +-
 4 files changed, 241 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
index b044de3..7c2fdcc 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java
@@ -18,7 +18,7 @@
 
 package org.apache.flink.table.catalog.hive;
 
-import org.apache.flink.table.catalog.ReadableWritableCatalog;
+import org.apache.flink.table.catalog.Catalog;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -46,7 +46,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * Base class for catalogs backed by Hive metastore.
  */
-public abstract class HiveCatalogBase implements ReadableWritableCatalog {
+public abstract class HiveCatalogBase implements Catalog {
private static final Logger LOG = 
LoggerFactory.getLogger(HiveCatalogBase.class);
 
public static final String DEFAULT_DB = "default";
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
index 19655ce..96f9e43 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java
@@ -47,7 +47,7 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 /**
  * A generic catalog implementation that holds all meta objects in memory.
  */
-public class GenericInMemoryCatalog implements ReadableWritableCatalog {
+public class GenericInMemoryCatalog implements Catalog {
 
public static final String DEFAULT_DB = "default";
 
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
similarity index 60%
rename from 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
rename to 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
index a398b72..917cf7a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.catalog;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -33,14 +34,79 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.catalog.stats.CatalogColumnStatistics;
 import org.apache.flink.table.catalog.stats.CatalogTableStatistics;
 
+import java.util.List;
+
 /**
- * An interface responsible for manipulating catalog metadata.
+ * This interface is responsible for reading and writing metadata such as 
database/table/views/UDFs
+ * from a registered catalog. It connects a registered catalog and Flink's 
Table API.
  */
-public interface ReadableWritableCatalog extends ReadableCatalog {
+@PublicEvolving
+public interface Catalog {
+
+   /**
+*

[flink] 02/02: [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit aedc4f5dcca95820969af694e6d2a71a1fe698bd
Author: Zhijiang 
AuthorDate: Sun May 5 17:35:53 2019 +0800

[FLINK-11726][network] Refactor the creations of ResultPartition and 
SingleInputGate into NetworkEnvironment

At the moment ResultPartition and SingleInputGate are created in Task. 
Based on new pluggable ShuffleManager, they should be created via 
ShuffleService#createResultPartitionWriter/InputGate.

The NetworkEnvironment would be refactored into NetworkShuffleService 
future. So we could migrate the process of creating ResultPartition and 
SingleInputGate into current NetworkEnvironment. The metrics registration of 
network and buffers can also be done along with the creations.
---
 .../runtime/io/network/NetworkEnvironment.java | 113 ++-
 .../network/metrics/InputBufferPoolUsageGauge.java |  51 +
 .../io/network/metrics/InputBuffersGauge.java  |  45 
 .../consumer => metrics}/InputChannelMetrics.java  |  12 +-
 .../consumer => metrics}/InputGateMetrics.java |  20 ++--
 .../metrics/OutputBufferPoolUsageGauge.java|  51 +
 .../io/network/metrics/OutputBuffersGauge.java |  45 
 .../ResultPartitionMetrics.java|  19 ++--
 .../io/network/partition/ResultPartition.java  |   2 +-
 .../partition/consumer/LocalInputChannel.java  |   1 +
 .../partition/consumer/RemoteInputChannel.java |   1 +
 .../partition/consumer/SingleInputGate.java|   3 +-
 .../partition/consumer/UnknownInputChannel.java|   1 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 124 -
 .../runtime/taskexecutor/TaskManagerServices.java  |   8 +-
 .../NetworkEnvironmentConfiguration.java   |  11 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 100 ++---
 .../io/network/NetworkEnvironmentBuilder.java  |  15 ++-
 .../network/partition/InputChannelTestUtils.java   |   2 +-
 .../partition/consumer/SingleInputGateTest.java|   1 +
 20 files changed, 404 insertions(+), 221 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 0ee8595..b9b35c9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -19,19 +19,34 @@
 package org.apache.flink.runtime.io.network;
 
 import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
+import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge;
+import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics;
+import org.apache.flink.runtime.io.network.metrics.InputGateMetrics;
+import org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge;
+import org.apache.flink.runtime.io.network.metrics.OutputBuffersGauge;
+import org.apache.flink.runtime.io.network.metrics.ResultPartitionMetrics;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
+import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
+import org.apache.flink.runtime.taskmanager.TaskActions;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.Preconditions;
 
@@ -56,6 +71,11 @@ public class NetworkEnvironment {
private static final String METRIC_TOTAL_MEMORY_SEGMENT = 
"TotalMemorySegments";
private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = 
"AvailableMemorySegm

[flink] branch master updated (b9704be -> aedc4f5)

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b9704be  [FLINK-12231][runtime] Introduce SchedulerNG interface
 new f392616  [hotfix][tests] Avoid mock NetworkEnvironment in tests
 new aedc4f5  [FLINK-11726][network] Refactor the creations of 
ResultPartition and SingleInputGate into NetworkEnvironment

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../runtime/io/network/NetworkEnvironment.java | 113 ++-
 .../network/metrics/InputBufferPoolUsageGauge.java |  33 +++---
 .../io/network/metrics/InputBuffersGauge.java  |  27 ++---
 .../consumer => metrics}/InputChannelMetrics.java  |  12 +-
 .../consumer => metrics}/InputGateMetrics.java |  20 ++--
 .../OutputBufferPoolUsageGauge.java}   |  47 
 .../network/metrics/OutputBuffersGauge.java}   |  31 --
 .../ResultPartitionMetrics.java|  19 ++--
 .../io/network/partition/ResultPartition.java  |   2 +-
 .../partition/consumer/LocalInputChannel.java  |   1 +
 .../partition/consumer/RemoteInputChannel.java |   1 +
 .../partition/consumer/SingleInputGate.java|   3 +-
 .../partition/consumer/UnknownInputChannel.java|   1 +
 .../runtime/metrics/groups/TaskIOMetricGroup.java  | 124 -
 .../runtime/taskexecutor/TaskManagerServices.java  |   8 +-
 .../NetworkEnvironmentConfiguration.java   |  11 ++
 .../org/apache/flink/runtime/taskmanager/Task.java | 100 ++---
 .../io/network/NetworkEnvironmentBuilder.java  |  15 ++-
 .../network/partition/InputChannelTestUtils.java   |   2 +-
 .../partition/consumer/SingleInputGateTest.java|   1 +
 .../runtime/taskexecutor/TaskExecutorTest.java |  15 ++-
 .../runtime/taskmanager/TaskAsyncCallTest.java |  17 ++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +-
 .../runtime/util/JvmExitOnFatalErrorTest.java  |   3 +-
 .../tasks/InterruptSensitiveRestoreTest.java   |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|   4 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |   3 +-
 29 files changed, 374 insertions(+), 358 deletions(-)
 copy 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
 => 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java
 (52%)
 copy 
flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java
 => 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBuffersGauge.java
 (59%)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition/consumer
 => metrics}/InputChannelMetrics.java (87%)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition/consumer
 => metrics}/InputGateMetrics.java (86%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{LocalConnectionManager.java
 => metrics/OutputBufferPoolUsageGauge.java} (52%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{taskmanager/NoOpTaskActions.java
 => io/network/metrics/OutputBuffersGauge.java} (58%)
 rename 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition => 
metrics}/ResultPartitionMetrics.java (85%)



[flink] 01/02: [hotfix][tests] Avoid mock NetworkEnvironment in tests

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f392616f404a6471d99d64eeab5dd1dd3da1a4a0
Author: Zhijiang 
AuthorDate: Thu Apr 25 17:05:21 2019 +0800

[hotfix][tests] Avoid mock NetworkEnvironment in tests
---
 .../runtime/taskexecutor/TaskExecutorTest.java |  15 +--
 .../runtime/taskmanager/TaskAsyncCallTest.java |  17 +++-
 .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +++--
 .../runtime/util/JvmExitOnFatalErrorTest.java  |   3 +-
 .../tasks/InterruptSensitiveRestoreTest.java   |   3 +-
 .../runtime/tasks/StreamTaskTerminationTest.java   |   3 +-
 .../streaming/runtime/tasks/StreamTaskTest.java|   4 +-
 .../runtime/tasks/SynchronousCheckpointITCase.java |   6 +-
 .../tasks/TaskCheckpointingBehaviourTest.java  |   3 +-
 9 files changed, 87 insertions(+), 74 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
index b47ad53..bf57e2a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java
@@ -197,6 +197,8 @@ public class TaskExecutorTest extends TestLogger {
 
private SettableLeaderRetrievalService jobManagerLeaderRetriever;
 
+   private NetworkEnvironment networkEnvironment;
+
@Before
public void setup() throws IOException {
rpc = new TestingRpcService();
@@ -220,6 +222,8 @@ public class TaskExecutorTest extends TestLogger {
jobManagerLeaderRetriever = new 
SettableLeaderRetrievalService();

haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever);
haServices.setJobMasterLeaderRetriever(jobId, 
jobManagerLeaderRetriever);
+
+   networkEnvironment = new NetworkEnvironmentBuilder().build();
}
 
@After
@@ -239,6 +243,10 @@ public class TaskExecutorTest extends TestLogger {
dummyBlobCacheService = null;
}
 
+   if (networkEnvironment != null) {
+   networkEnvironment.shutdown();
+   }
+
testingFatalErrorHandler.rethrowError();
}
 
@@ -262,7 +270,6 @@ public class TaskExecutorTest extends TestLogger {
MemoryType.HEAP,
false);
 
-   final NetworkEnvironment networkEnvironment = new 
NetworkEnvironmentBuilder().build();
networkEnvironment.start();
 
final KvStateService kvStateService = new KvStateService(new 
KvStateRegistry(), null, null);
@@ -699,8 +706,6 @@ public class TaskExecutorTest extends TestLogger {
when(taskSlotTable.tryMarkSlotActive(eq(jobId), 
eq(allocationId))).thenReturn(true);
when(taskSlotTable.addTask(any(Task.class))).thenReturn(true);
 
-   final NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
-
final TaskExecutorLocalStateStoresManager 
localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
@@ -958,13 +963,11 @@ public class TaskExecutorTest extends TestLogger {
rpc.registerGateway(resourceManagerGateway.getAddress(), 
resourceManagerGateway);
rpc.registerGateway(jobMasterGateway.getAddress(), 
jobMasterGateway);
 
-   final NetworkEnvironment networkMock = 
mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS);
-
final TaskExecutorLocalStateStoresManager 
localStateStoresManager = createTaskExecutorLocalStateStoresManager();
 
final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
.setTaskManagerLocation(taskManagerLocation)
-   .setNetworkEnvironment(networkMock)
+   .setNetworkEnvironment(networkEnvironment)
.setTaskSlotTable(taskSlotTable)
.setJobLeaderService(jobLeaderService)
.setJobManagerTable(jobManagerTable)
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index 9e06da0..1f743e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -41,11 +41,11 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.Fi

[flink] 03/03: [FLINK-12231][runtime] Introduce SchedulerNG interface

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b9704be0512337b3c74a26f3fb36035e6480d522
Author: Gary Yao 
AuthorDate: Tue Apr 23 11:37:28 2019 +0200

[FLINK-12231][runtime] Introduce SchedulerNG interface

Introduce SchedulerNG interface with a LegacyScheduler implementation, which
hides direct calls to the ExecutionGraph from the JobMaster.

The LegacyScheduler is only needed so that the existing scheduling code 
paths
keep functioning while work on the new scheduling abstractions is in 
progress.

Remove JobMasterTest#testAutomaticRestartingWhenCheckpointing() because it
requires the JobMaster to expose internal state (RestartStrategy) - add unit
tests for JobGraph#isCheckpointingEnabled() to make up for the removed test.
---
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   7 +-
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 506 -
 .../flink/runtime/jobmaster/LegacyScheduler.java   | 626 +
 .../runtime/jobmaster/LegacySchedulerFactory.java  |  82 +++
 .../flink/runtime/jobmaster/SchedulerNG.java   | 120 
 .../runtime/jobmaster/SchedulerNGFactory.java  |  56 ++
 .../factories/DefaultJobMasterServiceFactory.java  |  16 +-
 .../flink/runtime/jobgraph/JobGraphTest.java   |  46 ++
 .../flink/runtime/jobmaster/JobMasterTest.java |  64 +--
 9 files changed, 1069 insertions(+), 454 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
index c6c537f..6962b6e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java
@@ -25,6 +25,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.JobManagerRunner;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
 import org.apache.flink.runtime.jobmaster.JobMasterConfiguration;
+import org.apache.flink.runtime.jobmaster.LegacySchedulerFactory;
+import org.apache.flink.runtime.jobmaster.SchedulerNGFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory;
 import 
org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory;
 import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory;
@@ -56,6 +58,8 @@ public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
 
final SlotPoolFactory slotPoolFactory = 
DefaultSlotPoolFactory.fromConfiguration(configuration);
final SchedulerFactory schedulerFactory = 
DefaultSchedulerFactory.fromConfiguration(configuration);
+   final SchedulerNGFactory schedulerNGFactory = new 
LegacySchedulerFactory(
+   jobManagerServices.getRestartStrategyFactory());
 
final JobMasterServiceFactory jobMasterFactory = new 
DefaultJobMasterServiceFactory(
jobMasterConfiguration,
@@ -66,7 +70,8 @@ public enum DefaultJobManagerRunnerFactory implements 
JobManagerRunnerFactory {
jobManagerServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
-   fatalErrorHandler);
+   fatalErrorHandler,
+   schedulerNGFactory);
 
return new JobManagerRunner(
jobGraph,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index cae6922..627cc45 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -18,37 +18,22 @@
 
 package org.apache.flink.runtime.jobmaster;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.functions.AggregateFunction;
-import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.CheckpointingOptions;
-import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.queryablestate.KvStateID;
-import org.apache.flink.runtime.JobException;
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot;
 import org.apache.flink.runtime.blob.BlobWriter;
-import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
-import org

[flink] branch master updated (f8a6ea5 -> b9704be)

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from f8a6ea5  [hotfix][tests] Rename variables in 
EagerSchedulingStrategyTest
 new 1090967  [hotfix][runtime] Add null check to RestartStrategyResolving
 new 4177ae6  [hotfix][runtime] Remove unused field ARCHIVE_NAME from 
JobMaster
 new b9704be  [FLINK-12231][runtime] Introduce SchedulerNG interface

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../dispatcher/DefaultJobManagerRunnerFactory.java |   7 +-
 .../restart/RestartStrategyResolving.java  |   4 +
 .../apache/flink/runtime/jobmaster/JobMaster.java  | 507 -
 .../flink/runtime/jobmaster/LegacyScheduler.java   | 626 +
 .../runtime/jobmaster/LegacySchedulerFactory.java  |  82 +++
 .../flink/runtime/jobmaster/SchedulerNG.java   | 120 
 .../runtime/jobmaster/SchedulerNGFactory.java  |  56 ++
 .../factories/DefaultJobMasterServiceFactory.java  |  16 +-
 .../flink/runtime/jobgraph/JobGraphTest.java   |  46 ++
 .../flink/runtime/jobmaster/JobMasterTest.java |  64 +--
 10 files changed, 1073 insertions(+), 455 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java



[flink] 02/03: [hotfix][runtime] Remove unused field ARCHIVE_NAME from JobMaster

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 4177ae6a8aeba72025a4793714ed86c20f963d22
Author: Gary Yao 
AuthorDate: Mon Apr 29 12:12:39 2019 +0200

[hotfix][runtime] Remove unused field ARCHIVE_NAME from JobMaster
---
 .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java  | 1 -
 1 file changed, 1 deletion(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 6c83c7a..cae6922 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -141,7 +141,6 @@ public class JobMaster extends 
FencedRpcEndpoint implements JobMast
 
/** Default names for Flink's distributed components. */
public static final String JOB_MANAGER_NAME = "jobmanager";
-   public static final String ARCHIVE_NAME = "archive";
 
// 

 



[flink] 01/03: [hotfix][runtime] Add null check to RestartStrategyResolving

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 1090967f8af22b1801986ee39e1392738e8d3eea
Author: Gary Yao 
AuthorDate: Mon Apr 29 10:10:12 2019 +0200

[hotfix][runtime] Add null check to RestartStrategyResolving
---
 .../runtime/executiongraph/restart/RestartStrategyResolving.java  | 4 
 1 file changed, 4 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
index ad7aa93..7d3733b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.executiongraph.restart;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Utility method for resolving {@link RestartStrategy}.
  */
@@ -46,6 +48,8 @@ public final class RestartStrategyResolving {
RestartStrategyFactory serverStrategyFactory,
boolean isCheckpointingEnabled) {
 
+   checkNotNull(serverStrategyFactory);
+
final RestartStrategy clientSideRestartStrategy =

RestartStrategyFactory.createRestartStrategy(clientConfiguration);
 



[flink] 01/02: [FLINK-12228][runtime] Implement Eager Scheduling Strategy

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f6fc678297a29e16b73ff8d5159214d5afec0ef2
Author: shuai-xu 
AuthorDate: Fri Apr 26 13:54:06 2019 +0800

[FLINK-12228][runtime] Implement Eager Scheduling Strategy

This closes #8296.
---
 .../scheduler/ExecutionVertexDeploymentOption.java |  14 ++-
 .../strategy/EagerSchedulingStrategy.java  | 112 +++
 .../scheduler/strategy/SchedulingStrategy.java |   4 +-
 .../strategy/EagerSchedulingStrategyTest.java  | 121 +
 .../strategy/TestingSchedulerOperations.java   |  44 
 .../strategy/TestingSchedulingExecutionVertex.java |  57 ++
 .../strategy/TestingSchedulingTopology.java|  65 +++
 7 files changed, 413 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
index 829f6ba..9831a9e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
@@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler;
 
 import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
 
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
 /**
  * Component that stores the task need to be scheduled and the option for 
deployment.
  */
@@ -30,7 +32,15 @@ public class ExecutionVertexDeploymentOption {
private final DeploymentOption deploymentOption;
 
public ExecutionVertexDeploymentOption(ExecutionVertexID 
executionVertexId, DeploymentOption deploymentOption) {
-   this.executionVertexId = executionVertexId;
-   this.deploymentOption = deploymentOption;
+   this.executionVertexId = checkNotNull(executionVertexId);
+   this.deploymentOption = checkNotNull(deploymentOption);
+   }
+
+   public ExecutionVertexID getExecutionVertexId() {
+   return executionVertexId;
+   }
+
+   public DeploymentOption getDeploymentOption() {
+   return deploymentOption;
}
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
new file mode 100644
index 000..954fff5
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.scheduler.DeploymentOption;
+import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption;
+import org.apache.flink.runtime.scheduler.SchedulerOperations;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.StreamSupport;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * {@link SchedulingStrategy} instance for streaming job which will schedule 
all tasks at the same time.
+ */
+public class EagerSchedulingStrategy implements SchedulingStrategy {
+
+   private final SchedulerOperations schedulerOperations;
+
+   private final SchedulingTopology schedulingTopology;
+
+   private final DeploymentOption deploymentOption = new 
DeploymentOption(false);
+
+   public EagerSchedulingStrategy(
+   SchedulerOperations schedulerOperations,
+   SchedulingTopology schedulingTopology) {
+   this.schedulerOperations = checkNotNull(schedulerOperations)

[flink] 02/02: [hotfix][tests] Rename variables in EagerSchedulingStrategyTest

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit f8a6ea584b9841598f604761ea012136db86c97b
Author: Gary Yao 
AuthorDate: Wed May 8 13:09:15 2019 +0200

[hotfix][tests] Rename variables in EagerSchedulingStrategyTest
---
 .../strategy/EagerSchedulingStrategyTest.java  | 18 +-
 1 file changed, 9 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
index 881159d..1808bd8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
@@ -73,10 +73,10 @@ public class EagerSchedulingStrategyTest extends TestLogger 
{
assertThat(testingSchedulerOperations.getScheduledVertices(), 
hasSize(1));
 
Collection scheduledVertices = 
testingSchedulerOperations.getScheduledVertices().get(0);
-   assertThat(scheduledVertices, hasSize(5));
-   Collection vertices = 
getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+   Collection scheduledVertexIDs = 
getExecutionVertexIdsFromDeployOptions(scheduledVertices);
+   assertThat(scheduledVertexIDs, hasSize(5));
for (SchedulingExecutionVertex schedulingExecutionVertex : 
testingSchedulingTopology.getVertices()) {
-   assertThat(vertices, 
hasItem(schedulingExecutionVertex.getId()));
+   assertThat(scheduledVertexIDs, 
hasItem(schedulingExecutionVertex.getId()));
}
}
 
@@ -92,24 +92,24 @@ public class EagerSchedulingStrategyTest extends TestLogger 
{
testingSchedulingTopology.addSchedulingExecutionVertex(new 
TestingSchedulingExecutionVertex(jobVertexID, 3));
testingSchedulingTopology.addSchedulingExecutionVertex(new 
TestingSchedulingExecutionVertex(jobVertexID, 4));
 
-   Set toBeRestartedVertices1 = new 
HashSet<>(Arrays.asList(
+   Set verticesToRestart1 = new 
HashSet<>(Arrays.asList(
new ExecutionVertexID(jobVertexID, 0),
new ExecutionVertexID(jobVertexID, 4)));
-   schedulingStrategy.restartTasks(toBeRestartedVertices1);
+   schedulingStrategy.restartTasks(verticesToRestart1);
 
-   Set toBeRestartedVertices2 = new 
HashSet<>(Arrays.asList(
+   Set verticesToRestart2 = new 
HashSet<>(Arrays.asList(
new ExecutionVertexID(jobVertexID, 1),
new ExecutionVertexID(jobVertexID, 2),
new ExecutionVertexID(jobVertexID, 3)));
-   schedulingStrategy.restartTasks(toBeRestartedVertices2);
+   schedulingStrategy.restartTasks(verticesToRestart2);
 
assertThat(testingSchedulerOperations.getScheduledVertices(), 
hasSize(2));
 
Collection scheduledVertices1 
= testingSchedulerOperations.getScheduledVertices().get(0);
-   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), 
containsInAnyOrder(toBeRestartedVertices1.toArray()));
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), 
containsInAnyOrder(verticesToRestart1.toArray()));
 
Collection scheduledVertices2 
= testingSchedulerOperations.getScheduledVertices().get(1);
-   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), 
containsInAnyOrder(toBeRestartedVertices2.toArray()));
+   
assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), 
containsInAnyOrder(verticesToRestart2.toArray()));
}
 
private static Collection 
getExecutionVertexIdsFromDeployOptions(



[flink] branch master updated (9a4c3dc -> f8a6ea5)

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 9a4c3dc  [FLINK-12227][runtime] Introduce SchedulingStrategy interface
 new f6fc678  [FLINK-12228][runtime] Implement Eager Scheduling Strategy
 new f8a6ea5  [hotfix][tests] Rename variables in 
EagerSchedulingStrategyTest

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../scheduler/ExecutionVertexDeploymentOption.java |  14 ++-
 .../strategy/EagerSchedulingStrategy.java  | 112 +++
 .../scheduler/strategy/SchedulingStrategy.java |   4 +-
 .../strategy/EagerSchedulingStrategyTest.java  | 121 +
 .../strategy/TestingSchedulerOperations.java}  |  25 +++--
 .../TestingSchedulingExecutionVertex.java} |  31 --
 .../strategy/TestingSchedulingTopology.java|  65 +++
 7 files changed, 351 insertions(+), 21 deletions(-)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java
 copy 
flink-runtime/src/{main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java
 => 
test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java}
 (55%)
 copy 
flink-runtime/src/{main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java
 => 
test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java}
 (53%)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java



[flink-shaded] annotated tag release-7.0-rc1 updated (7d7c687 -> 585d6b8)

2019-05-08 Thread jincheng
This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a change to annotated tag release-7.0-rc1
in repository https://gitbox.apache.org/repos/asf/flink-shaded.git.


*** WARNING: tag release-7.0-rc1 was modified! ***

from 7d7c687  (commit)
  to 585d6b8  (tag)
 tagging 7d7c68799a0488007bed63033c1e8110210a6cfb (commit)
 replaces release-6.0
  by sunjincheng121
  on Wed May 8 14:54:46 2019 +0800

- Log -
release-7.0-rc1
-BEGIN PGP SIGNATURE-

iQIzBAABCAAdFiEEj+oe6dAEjAzMcLdXMhGwcDt56g4FAlzSfTYACgkQMhGwcDt5
6g7QFQ/+NaV06RQmhhZG0n7JCB12Yo8vG0ovWovcd3ADGxYEEpRpbesCFI/YaKgV
frfRA+5JCbRVKoHdXXeV/WAKDzta6Of4w3sw3oBAVDRPZB3o1sHGVvOM73LPMtAz
ZAUi3GD0JwgPc/fFHL/K4EjIv8eTVDN4YH5YDdupyYIMuolywvl2eOw1YF7gqoQL
ZMb538xVF4ob3neizYrcYaEH3OTPUsLwb4hObJUbIT2zqcZWsGTlLjEzp2Bp9WHz
od+nxM4P8LsdXIc03oGmncTkxRgb9Mlixw3FdKrU05WFh2Q+qVReJgKLQm2uxkXQ
vyWwxFoFU0exqvoAkdeRGwF3jSqk4dP1Y46HSNoxh/JaN7kJbvbj2VDLh3Jj+iWq
AQwxUoOf3T22hVPvBn3pbGEk5SjM++B7u/suxpXsZ8Eh3+ooNd5NlLd90GuxmfV6
6SiLjkYo5bEZCzZjBAxHU3wWvtg5kR+WI+FfVtxHW3maakJusQZf47DdEauh5X+v
kuRoShYic+v0jy9d1dFa2Jn2S70Q42JWapKjZQT9i/qFlOddhPeHCHd5YtksKHfe
f/7wmWsE0FBS5tAhBuWXaGeQKyRg99PsRY40bYCawjwNK5q8U2fu8xethJtxpKDa
cFXq1nV+d+q24TFT/9Hhr1QwADDRIwUxpr+b4dofq37GoiZ9Lx8=
=U9wq
-END PGP SIGNATURE-
---


No new revisions were added by this update.

Summary of changes:



svn commit: r33943 - in /dev/flink/flink-shaded-7.0-rc1: ./ flink-shaded-7.0-src.tgz flink-shaded-7.0-src.tgz.asc flink-shaded-7.0-src.tgz.sha512

2019-05-08 Thread jincheng
Author: jincheng
Date: Wed May  8 10:39:29 2019
New Revision: 33943

Log:
flink-shaded-7.0 release

Added:
dev/flink/flink-shaded-7.0-rc1/
dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz   (with props)
dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc
dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512

Added: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz
==
Binary file - no diff available.

Propchange: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz
--
svn:mime-type = application/octet-stream

Added: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc
==
--- dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc (added)
+++ dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc Wed May  8 
10:39:29 2019
@@ -0,0 +1,16 @@
+-BEGIN PGP SIGNATURE-
+
+iQIzBAABCAAdFiEEj+oe6dAEjAzMcLdXMhGwcDt56g4FAlzSfVcACgkQMhGwcDt5
+6g7lTw//URjyaiM029ktPaagBy8I0Hp4Ltfdo8aenkohdLLP7txj+g78+t3/M8f0
+xXD/FdRqbUGr8ok4siEvsuiX8rWv0HUhRkNZ5dvG6esPlVWXuqBP+xjdYi/8XfTS
+OfhNYM248RVY+2Hu1q8BC+VTV7vwi6ZBZY72Lb/OkMt64Hg5KD/uVmvqvnfeRA27
+PvzyzkzP6pwFk8Rg8+3b0RENp6CRk41vrbR303B+Dwld2OcEB9IrxTQzrzDtWx9X
+361/tJWBBqXNLjaH5R7i4X9Ipu7i7GlkigY7PdxlJYbpPZnjXC0C82AZs6C5U//C
+ro1+n9uN/wj0sO0hQMhTlYkWiTBiy7HnjSpC9sIpJpX0U+SWs60z2Dxp/K0cDZSe
+BS5jcoY4gpcZrsTE7igOWqwXInZRi2NpisUjqU7qVnOMr5OnNfEhJ3mGTxlA34G5
++SiZeDDCQZhKvqhR8SlemkMW3lMIxfOvuZOVrJhXV0RTVz6hpJxyCm7LZRwAaQK9
+9uvmCnLEE92RZzBc4eI8vHEY23VOQv5HzGYJ4z804iqTPy8s/V1R++n3A1HPLTf+
+2rbnrEwzB4srTrWOjFxslIn9cVq+ZLVSvqrhEPfoTK2VwNmdK7xbX7gg5fhNJ9yR
+quvPKgJyzNCdBwU43neirMlS0uUnvgdGTuXOvX9ul5pgJCrb6DY=
+=txgS
+-END PGP SIGNATURE-

Added: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512
==
--- dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512 (added)
+++ dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512 Wed May  8 
10:39:29 2019
@@ -0,0 +1 @@
+a8c14810a3192fb016bf444273a67945091f5c1cea1bc02488070ce935062094b18477afd99b1b7108892b94be07541b000defb9e202a498da4b1e55d82e64e3
  flink-shaded-7.0-src.tgz




[flink-shaded] branch master updated: [FLINK-12397][build] Drop flink-shaded-asm-5

2019-05-08 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-shaded.git


The following commit(s) were added to refs/heads/master by this push:
 new ed42ad1  [FLINK-12397][build] Drop flink-shaded-asm-5
ed42ad1 is described below

commit ed42ad1e9b4e9d9588acc5de61c7703f765a298e
Author: Chesnay Schepler 
AuthorDate: Wed May 8 11:11:31 2019 +0200

[FLINK-12397][build] Drop flink-shaded-asm-5
---
 flink-shaded-asm-5/pom.xml | 83 --
 .../src/main/resources/META-INF/NOTICE | 10 ---
 .../main/resources/META-INF/licenses/LICENSE.asm   | 31 
 pom.xml|  1 -
 4 files changed, 125 deletions(-)

diff --git a/flink-shaded-asm-5/pom.xml b/flink-shaded-asm-5/pom.xml
deleted file mode 100644
index 1e72bff..000
--- a/flink-shaded-asm-5/pom.xml
+++ /dev/null
@@ -1,83 +0,0 @@
-
-
-http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/maven-v4_0_0.xsd";>
-
-4.0.0
-
-
-org.apache.flink
-flink-shaded
-7.0
-..
-
-
-flink-shaded-asm
-flink-shaded-asm-5
-${asm.version}-7.0
-
-jar
-
-
-5.0.4
-
-
-
-
-org.ow2.asm
-asm-all
-${asm.version}
-
-
-
-
-
-
-org.apache.maven.plugins
-maven-shade-plugin
-
-
-shade-flink
-package
-
-shade
-
-
-
true
-
${project.basedir}/target/dependency-reduced-pom.xml
-
-
-org.ow2.asm:*
-
-
-
-
-org.objectweb
-
org.apache.flink.shaded.asm5.org.objectweb
-
-
-
-
-
-
-
-
-
-
diff --git a/flink-shaded-asm-5/src/main/resources/META-INF/NOTICE 
b/flink-shaded-asm-5/src/main/resources/META-INF/NOTICE
deleted file mode 100644
index a3065f2..000
--- a/flink-shaded-asm-5/src/main/resources/META-INF/NOTICE
+++ /dev/null
@@ -1,10 +0,0 @@
-flink-shaded-asm
-Copyright 2014-2018 The Apache Software Foundation
-
-This product includes software developed at
-The Apache Software Foundation (http://www.apache.org/).
-
-This project bundles the following dependencies under the BSD license.
-See bundled license files for details.
-
-- org.ow2.asm:asm-all:5.0.4
\ No newline at end of file
diff --git 
a/flink-shaded-asm-5/src/main/resources/META-INF/licenses/LICENSE.asm 
b/flink-shaded-asm-5/src/main/resources/META-INF/licenses/LICENSE.asm
deleted file mode 100644
index 62ffbcc..000
--- a/flink-shaded-asm-5/src/main/resources/META-INF/licenses/LICENSE.asm
+++ /dev/null
@@ -1,31 +0,0 @@
-ASM: a very small and fast Java bytecode manipulation framework
-
-Copyright (c) 2000-2011 INRIA, France Telecom
-All rights reserved.
-
-Redistribution and use in source and binary forms, with or without
-modification, are permitted provided that the following conditions
-are met:
-
-1. Redistributions of source code must retain the above copyright
-   notice, this list of conditions and the following disclaimer.
-
-2. Redistributions in binary form must reproduce the above copyright
-   notice, this list of conditions and the following disclaimer in the
-   documentation and/or other materials provided with the distribution.
-
-3. Neither the name of the copyright holders nor the names of its
-   contributors may be used to endorse or promote products derived from
-   this software without specific prior written permission.
-
-THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF

[flink] 01/02: [FLINK-12227][runtime] Add interfaces for SchedulingTopology

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8c862e5fba73a335995c1667bb036ab80edf579e
Author: Till Rohrmann 
AuthorDate: Thu Apr 18 17:57:11 2019 +0200

[FLINK-12227][runtime] Add interfaces for SchedulingTopology

The SchedulingTopology contains the information which is provided to the
SchedulingStrategy in order to decide which execution vertices should be
scheduled next.
---
 .../scheduler/strategy/ExecutionVertexID.java  | 76 +
 .../strategy/SchedulingExecutionVertex.java| 58 +
 .../strategy/SchedulingResultPartition.java| 99 ++
 .../scheduler/strategy/SchedulingTopology.java | 53 
 4 files changed, 286 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
new file mode 100644
index 000..6e44e11
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.strategy;
+
+import org.apache.flink.runtime.executiongraph.ExecutionVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+
+import static org.apache.flink.util.Preconditions.checkArgument;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Id identifying {@link ExecutionVertex}.
+ */
+public class ExecutionVertexID {
+   private final JobVertexID jobVertexId;
+
+   private final int subtaskIndex;
+
+   public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) {
+   checkArgument(subtaskIndex >= 0, "subtaskIndex must be greater 
than or equal to 0");
+
+   this.jobVertexId = checkNotNull(jobVertexId);
+   this.subtaskIndex = subtaskIndex;
+   }
+
+   public JobVertexID getJobVertexId() {
+   return jobVertexId;
+   }
+
+   public int getSubtaskIndex() {
+   return subtaskIndex;
+   }
+
+   @Override
+   public boolean equals(Object o) {
+   if (this == o) {
+   return true;
+   }
+
+   if (o == null || getClass() != o.getClass()) {
+   return false;
+   }
+
+   ExecutionVertexID that = (ExecutionVertexID) o;
+
+   return subtaskIndex == that.subtaskIndex && 
jobVertexId.equals(that.jobVertexId);
+   }
+
+   @Override
+   public int hashCode() {
+   int result = jobVertexId.hashCode();
+   result = 31 * result + subtaskIndex;
+   return result;
+   }
+
+   @Override
+   public String toString() {
+   return jobVertexId + "_" + subtaskIndex;
+   }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
new file mode 100644
index 000..b9b2271
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing p

[flink] 02/02: [FLINK-12227][runtime] Introduce SchedulingStrategy interface

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9a4c3dc54b462c44987b36bb6af6f7a570d829a7
Author: shuai-xu 
AuthorDate: Mon Apr 22 11:32:39 2019 +0800

[FLINK-12227][runtime] Introduce SchedulingStrategy interface

This closes #8233.
---
 .../flink/runtime/scheduler/DeploymentOption.java  | 35 
 .../scheduler/ExecutionVertexDeploymentOption.java | 36 +
 .../runtime/scheduler/SchedulerOperations.java | 36 +
 .../scheduler/strategy/SchedulingStrategy.java | 62 ++
 .../strategy/SchedulingStrategyFactory.java| 33 
 5 files changed, 202 insertions(+)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
new file mode 100644
index 000..9fb9ace
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+/**
+ * Deployment option which indicates whether the task should send 
scheduleOrUpdateConsumer message to master.
+ */
+public class DeploymentOption {
+
+   private final boolean sendScheduleOrUpdateConsumerMessage;
+
+   public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) {
+   this.sendScheduleOrUpdateConsumerMessage = 
sendScheduleOrUpdateConsumerMessage;
+   }
+
+   public boolean sendScheduleOrUpdateConsumerMessage() {
+   return sendScheduleOrUpdateConsumerMessage;
+   }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
new file mode 100644
index 000..829f6ba
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler;
+
+import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
+
+/**
+ * Component that stores the task need to be scheduled and the option for 
deployment.
+ */
+public class ExecutionVertexDeploymentOption {
+
+   private final ExecutionVertexID executionVertexId;
+
+   private final DeploymentOption deploymentOption;
+
+   public ExecutionVertexDeploymentOption(ExecutionVertexID 
executionVertexId, DeploymentOption deploymentOption) {
+   this.executionVertexId = executionVertexId;
+   this.deploymentOption = deploymentOption;
+   }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java
new file mode 100644
index 000..50d3f87
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright own

[flink] branch master updated (12d698b -> 9a4c3dc)

2019-05-08 Thread gary
This is an automated email from the ASF dual-hosted git repository.

gary pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 12d698b  [FLINK-12365][table] Add stats related catalog APIs
 new 8c862e5  [FLINK-12227][runtime] Add interfaces for SchedulingTopology
 new 9a4c3dc  [FLINK-12227][runtime] Introduce SchedulingStrategy interface

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../flink/runtime/scheduler/DeploymentOption.java  | 28 ++
 .../ExecutionVertexDeploymentOption.java}  | 19 ++---
 .../SchedulerOperations.java}  | 26 +++---
 .../scheduler/strategy/ExecutionVertexID.java  | 49 +--
 .../strategy/SchedulingExecutionVertex.java}   | 40 ++---
 .../strategy/SchedulingResultPartition.java| 99 ++
 .../scheduler/strategy/SchedulingStrategy.java | 62 ++
 .../strategy/SchedulingStrategyFactory.java}   | 17 ++--
 .../scheduler/strategy/SchedulingTopology.java | 53 
 9 files changed, 306 insertions(+), 87 deletions(-)
 copy 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java
 => 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java
 (62%)
 copy flink-runtime/src/main/java/org/apache/flink/runtime/{JobException.java 
=> scheduler/ExecutionVertexDeploymentOption.java} (63%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{rest/messages/JobMessageParameters.java
 => scheduler/SchedulerOperations.java} (61%)
 copy 
flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java
 => 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java
 (51%)
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{jobgraph/DistributionPattern.java
 => scheduler/strategy/SchedulingExecutionVertex.java} (53%)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java
 copy 
flink-runtime/src/main/java/org/apache/flink/runtime/{minicluster/RpcServiceSharing.java
 => scheduler/strategy/SchedulingStrategyFactory.java} (68%)
 create mode 100644 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java