[flink] 02/05: [FLINK-12325][metrics] Add counter/gauge tests for StatsD
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit a8cc170c9471beac271b6f5e22f18d586dc33765 Author: Richard Deurwaarder AuthorDate: Thu May 2 12:07:08 2019 +0200 [FLINK-12325][metrics] Add counter/gauge tests for StatsD --- .../flink/metrics/statsd/StatsDReporterTest.java | 23 ++ 1 file changed, 23 insertions(+) diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index a0b853f..d447a4c 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Metric; @@ -163,6 +164,28 @@ public class StatsDReporterTest extends TestLogger { testMetricAndAssert(new TestMeter(), "metric", expectedLines); } + /** +* Tests that counter are properly reported via the StatsD reporter. +*/ + @Test + public void testStatsDCountersReporting() throws Exception { + Set expectedLines = new HashSet<>(2); + expectedLines.add("metric:100|g"); + + Counter counter = new SimpleCounter(); + counter.inc(100); + + testMetricAndAssert(counter, "metric", expectedLines); + } + + @Test + public void testStatsDGaugesReporting() throws Exception { + Set expectedLines = new HashSet<>(2); + expectedLines.add("metric:75|g"); + + testMetricAndAssert((Gauge) () -> 75, "metric", expectedLines); + } + private void testMetricAndAssert(Metric metric, String metricName, Set expectation) throws Exception { StatsDReporter reporter = null; DatagramSocketReceiver receiver = null;
[flink] 05/05: [FLINK-12325][metrics] StatsDReporter properly handles negative values
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 92b1a66a63aa10fc58c9f2ac4baca859437db40c Author: Richard Deurwaarder AuthorDate: Thu May 2 12:47:36 2019 +0200 [FLINK-12325][metrics] StatsDReporter properly handles negative values --- .../flink/metrics/statsd/StatsDReporter.java | 59 -- .../flink/metrics/statsd/StatsDReporterTest.java | 59 ++ 2 files changed, 102 insertions(+), 16 deletions(-) diff --git a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java index 527f9c1..29fbeb9 100644 --- a/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java +++ b/flink-metrics/flink-metrics-statsd/src/main/java/org/apache/flink/metrics/statsd/StatsDReporter.java @@ -127,14 +127,20 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { // private void reportCounter(final String name, final Counter counter) { - send(name, String.valueOf(counter.getCount())); + send(name, counter.getCount()); } private void reportGauge(final String name, final Gauge gauge) { Object value = gauge.getValue(); - if (value != null) { - send(name, value.toString()); + if (value == null) { + return; } + + if (value instanceof Number) { + send(numberIsNegative((Number) value), name, value.toString()); + } + + send(name, value.toString()); } private void reportHistogram(final String name, final Histogram histogram) { @@ -143,25 +149,25 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { HistogramStatistics statistics = histogram.getStatistics(); if (statistics != null) { - send(prefix(name, "count"), String.valueOf(histogram.getCount())); - send(prefix(name, "max"), String.valueOf(statistics.getMax())); - send(prefix(name, "min"), String.valueOf(statistics.getMin())); - send(prefix(name, "mean"), String.valueOf(statistics.getMean())); - send(prefix(name, "stddev"), String.valueOf(statistics.getStdDev())); - send(prefix(name, "p50"), String.valueOf(statistics.getQuantile(0.5))); - send(prefix(name, "p75"), String.valueOf(statistics.getQuantile(0.75))); - send(prefix(name, "p95"), String.valueOf(statistics.getQuantile(0.95))); - send(prefix(name, "p98"), String.valueOf(statistics.getQuantile(0.98))); - send(prefix(name, "p99"), String.valueOf(statistics.getQuantile(0.99))); - send(prefix(name, "p999"), String.valueOf(statistics.getQuantile(0.999))); + send(prefix(name, "count"), histogram.getCount()); + send(prefix(name, "max"), statistics.getMax()); + send(prefix(name, "min"), statistics.getMin()); + send(prefix(name, "mean"), statistics.getMean()); + send(prefix(name, "stddev"), statistics.getStdDev()); + send(prefix(name, "p50"), statistics.getQuantile(0.5)); + send(prefix(name, "p75"), statistics.getQuantile(0.75)); + send(prefix(name, "p95"), statistics.getQuantile(0.95)); + send(prefix(name, "p98"), statistics.getQuantile(0.98)); + send(prefix(name, "p99"), statistics.getQuantile(0.99)); + send(prefix(name, "p999"), statistics.getQuantile(0.999)); } } } private void reportMeter(final String name, final Meter meter) { if (meter != null) { - send(prefix(name, "rate"), String.valueOf(meter.getRate())); - send(prefix(name, "count"), String.valueOf(meter.getCount())); + send(prefix(name, "rate"), meter.getRate()); + send(prefix(name, "count"), meter.getCount()); } } @@ -179,6 +185,23 @@ public class StatsDReporter extends AbstractReporter implements Scheduled { }
[flink] 01/05: [FLINK-12325][metrics] Refactor StatsD tests
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7b806e7eb75e40729f1565ecc65bb632fd4eaf28 Author: Richard Deurwaarder AuthorDate: Thu May 2 12:02:48 2019 +0200 [FLINK-12325][metrics] Refactor StatsD tests --- .../flink/metrics/statsd/StatsDReporterTest.java | 124 ++--- 1 file changed, 35 insertions(+), 89 deletions(-) diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index 68b90ce..a0b853f 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -25,8 +25,11 @@ import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Histogram; import org.apache.flink.metrics.HistogramStatistics; +import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.metrics.reporter.MetricReporter; import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -132,73 +135,20 @@ public class StatsDReporterTest extends TestLogger { */ @Test public void testStatsDHistogramReporting() throws Exception { - MetricRegistryImpl registry = null; - DatagramSocketReceiver receiver = null; - Thread receiverThread = null; - long timeout = 5000; - long joinTimeout = 3; - - String histogramName = "histogram"; - - try { - receiver = new DatagramSocketReceiver(); - - receiverThread = new Thread(receiver); - - receiverThread.start(); - - int port = receiver.getPort(); - - MetricConfig config = new MetricConfig(); - config.setProperty(ConfigConstants.METRICS_REPORTER_INTERVAL_SUFFIX, "1 SECONDS"); - config.setProperty("host", "localhost"); - config.setProperty("port", String.valueOf(port)); - - registry = new MetricRegistryImpl( - MetricRegistryConfiguration.defaultMetricRegistryConfiguration(), - Collections.singletonList(ReporterSetup.forReporter("test", config, new StatsDReporter(; - - TaskManagerMetricGroup metricGroup = new TaskManagerMetricGroup(registry, "localhost", "tmId"); - - TestingHistogram histogram = new TestingHistogram(); - - metricGroup.histogram(histogramName, histogram); - - receiver.waitUntilNumLines(11, timeout); - - Set lines = receiver.getLines(); - - String prefix = metricGroup.getMetricIdentifier(histogramName); - - Set expectedLines = new HashSet<>(); - - expectedLines.add(prefix + ".count:1|g"); - expectedLines.add(prefix + ".mean:3.0|g"); - expectedLines.add(prefix + ".min:6|g"); - expectedLines.add(prefix + ".max:5|g"); - expectedLines.add(prefix + ".stddev:4.0|g"); - expectedLines.add(prefix + ".p75:0.75|g"); - expectedLines.add(prefix + ".p98:0.98|g"); - expectedLines.add(prefix + ".p99:0.99|g"); - expectedLines.add(prefix + ".p999:0.999|g"); - expectedLines.add(prefix + ".p95:0.95|g"); - expectedLines.add(prefix + ".p50:0.5|g"); - - assertEquals(expectedLines, lines); - - } finally { - if (registry != null) { - registry.shutdown().get(); - } - - if (receiver != null) { - receiver.stop(); - } - - if (receiverThread != null) { - receiverThread.join(joinTimeout); - } - } + Set expectedLines = new HashSet<>(6); + expectedLines.add("metric.count:1|g"); + expectedLines.add("metric.mean:3.0|g"); + expectedLines.add("metric.min:6|g"); + expectedLines.add("metric.max:5|g"); + ex
[flink] branch master updated (c954780 -> 92b1a66)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c954780 [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface new 7b806e7 [FLINK-12325][metrics] Refactor StatsD tests new a8cc170 [FLINK-12325][metrics] Add counter/gauge tests for StatsD new 7065b7f [FLINK-12325][metrics] Extend test metric implementations new c636c53 [FLINK-12325][metrics] Migrate StatsD test to test metric implementations new 92b1a66 [FLINK-12325][metrics] StatsDReporter properly handles negative values The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../util/{TestMeter.java => TestCounter.java} | 32 ++- .../apache/flink/metrics/util/TestHistogram.java | 43 +++- .../org/apache/flink/metrics/util/TestMeter.java | 15 +- .../flink/metrics/statsd/StatsDReporter.java | 59 +++-- .../flink/metrics/statsd/StatsDReporterTest.java | 246 + 5 files changed, 225 insertions(+), 170 deletions(-) copy flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/{TestMeter.java => TestCounter.java} (66%)
[flink] 04/05: [FLINK-12325][metrics] Migrate StatsD test to test metric implementations
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit c636c53421ff00749162cf34c29ca4177deec77d Author: Richard Deurwaarder AuthorDate: Thu May 2 12:47:14 2019 +0200 [FLINK-12325][metrics] Migrate StatsD test to test metric implementations --- .../flink/metrics/statsd/StatsDReporterTest.java | 72 +++--- 1 file changed, 8 insertions(+), 64 deletions(-) diff --git a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java index d447a4c..61bf07d 100644 --- a/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java +++ b/flink-metrics/flink-metrics-statsd/src/test/java/org/apache/flink/metrics/statsd/StatsDReporterTest.java @@ -24,14 +24,14 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; -import org.apache.flink.metrics.Histogram; -import org.apache.flink.metrics.HistogramStatistics; import org.apache.flink.metrics.Metric; import org.apache.flink.metrics.MetricConfig; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.SimpleCounter; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; import org.apache.flink.metrics.reporter.MetricReporter; +import org.apache.flink.metrics.util.TestCounter; +import org.apache.flink.metrics.util.TestHistogram; import org.apache.flink.metrics.util.TestMeter; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; @@ -138,10 +138,10 @@ public class StatsDReporterTest extends TestLogger { public void testStatsDHistogramReporting() throws Exception { Set expectedLines = new HashSet<>(6); expectedLines.add("metric.count:1|g"); - expectedLines.add("metric.mean:3.0|g"); - expectedLines.add("metric.min:6|g"); - expectedLines.add("metric.max:5|g"); - expectedLines.add("metric.stddev:4.0|g"); + expectedLines.add("metric.mean:4.0|g"); + expectedLines.add("metric.min:7|g"); + expectedLines.add("metric.max:6|g"); + expectedLines.add("metric.stddev:5.0|g"); expectedLines.add("metric.p75:0.75|g"); expectedLines.add("metric.p98:0.98|g"); expectedLines.add("metric.p99:0.99|g"); @@ -149,7 +149,7 @@ public class StatsDReporterTest extends TestLogger { expectedLines.add("metric.p95:0.95|g"); expectedLines.add("metric.p50:0.5|g"); - testMetricAndAssert(new TestingHistogram(), "metric", expectedLines); + testMetricAndAssert(new TestHistogram(), "metric", expectedLines); } /** @@ -172,10 +172,7 @@ public class StatsDReporterTest extends TestLogger { Set expectedLines = new HashSet<>(2); expectedLines.add("metric:100|g"); - Counter counter = new SimpleCounter(); - counter.inc(100); - - testMetricAndAssert(counter, "metric", expectedLines); + testMetricAndAssert(new TestCounter(100), "metric", expectedLines); } @Test @@ -245,59 +242,6 @@ public class StatsDReporterTest extends TestLogger { } } - private static class TestingHistogram implements Histogram { - - @Override - public void update(long value) { - - } - - @Override - public long getCount() { - return 1; - } - - @Override - public HistogramStatistics getStatistics() { - return new HistogramStatistics() { - @Override - public double getQuantile(double quantile) { - return quantile; - } - - @Override - public long[] getValues() { - return new long[0]; - } - - @Override - public int size() { - return 2; - } - - @Override - public double getMean() { - return 3; - } - - @Override - public double getStdDev() { -
[flink] 03/05: [FLINK-12325][metrics] Extend test metric implementations
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 7065b7f3ce44c3d5080f33df82095055a4540411 Author: Richard Deurwaarder AuthorDate: Thu May 2 12:39:08 2019 +0200 [FLINK-12325][metrics] Extend test metric implementations --- .../util/{TestMeter.java => TestCounter.java} | 32 .../apache/flink/metrics/util/TestHistogram.java | 43 ++ .../org/apache/flink/metrics/util/TestMeter.java | 15 +++- 3 files changed, 73 insertions(+), 17 deletions(-) diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestMeter.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestCounter.java similarity index 66% copy from flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestMeter.java copy to flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestCounter.java index b1ec3a3..1f47899 100644 --- a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestMeter.java +++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestCounter.java @@ -18,28 +18,44 @@ package org.apache.flink.metrics.util; -import org.apache.flink.metrics.Meter; +import org.apache.flink.metrics.Counter; /** - * A dummy {@link Meter} implementation. + * A dummy {@link Counter} implementation. */ -public class TestMeter implements Meter { +public class TestCounter implements Counter { + private long countValue; + + public TestCounter() { + this.countValue = 0; + } + + public TestCounter(long countValue) { + this.countValue = countValue; + } + + @Override + public void inc() { + countValue++; + } @Override - public void markEvent() { + public void inc(long n) { + countValue += n; } @Override - public void markEvent(long n) { + public void dec() { + countValue--; } @Override - public double getRate() { - return 5; + public void dec(long n) { + countValue -= n; } @Override public long getCount() { - return 100L; + return countValue; } } diff --git a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java index 6df2e92..06dbd39 100644 --- a/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java +++ b/flink-metrics/flink-metrics-core/src/test/java/org/apache/flink/metrics/util/TestHistogram.java @@ -25,14 +25,19 @@ import org.apache.flink.metrics.HistogramStatistics; * Stateless test histogram for which all methods return a static value. */ public class TestHistogram implements Histogram { - + private long count = 1; + private int size = 3; + private double mean = 4; + private double stdDev = 5; + private long max = 6; + private long min = 7; @Override public void update(long value) { } @Override public long getCount() { - return 1; + return count; } @Override @@ -50,28 +55,52 @@ public class TestHistogram implements Histogram { @Override public int size() { - return 3; + return size; } @Override public double getMean() { - return 4; + return mean; } @Override public double getStdDev() { - return 5; + return stdDev; } @Override public long getMax() { - return 6; + return max; } @Override public long getMin() { - return 7; + return min; } }; } + + public void setSize(int size) { + this.size = size; + } + + public void setMean(double mean) { + this.mean = mean; + } + + public void setStdDev(double stdDev) { + this.stdDev = stdDev; + } + + public void setMax(long max) { + this.max = max; + } + + public void setMin(long m
[flink-web] branch asf-site updated: Rebuild website
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new fa63dbb Rebuild website fa63dbb is described below commit fa63dbb0edc0295f5f3f0ba2ac5477b61f8c593a Author: sunjincheng121 AuthorDate: Thu May 9 08:26:29 2019 +0800 Rebuild website --- content/roadmap.html| 9 + content/zh/roadmap.html | 9 + 2 files changed, 18 insertions(+) diff --git a/content/roadmap.html b/content/roadmap.html index adba88f..7e450b4 100644 --- a/content/roadmap.html +++ b/content/roadmap.html @@ -338,6 +338,15 @@ the catalogs (https://issues.apache.org/jira/browse/FLINK-10232";>FLINK- There is a broad effort to integrate Flink with the Hive Ecosystem, including metastore and Hive UDF support https://issues.apache.org/jira/browse/FLINK-10556";>FLINK-10556. +There is also a big effort to support Python for Table API https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API";>FLIP-38. +We will divide the work into following stages: + + + Translate Python Table API queries without UDFs to Java and run them completely in Java for the first step. + Add support for User-defined functions(Scalar Function/Table Function/Aggregate Function) in the second step. + Integrating Pandas as the final effort, i.e., functions in Pandas can be used in Python Table API directly. + + Connectors & Formats Support for additional connectors and formats is a continuous process. diff --git a/content/zh/roadmap.html b/content/zh/roadmap.html index 7419bfd..182ac2b 100644 --- a/content/zh/roadmap.html +++ b/content/zh/roadmap.html @@ -278,6 +278,15 @@ under the License. 还有一个巨大的工作是将 Flink 与 Hive 生态系统集成。包括 Metastore 和 Hive UDF 支持 https://issues.apache.org/jira/browse/FLINK-10556";>FLINK-10556。 +社区在Table API支持Python方面也做出了很多的努力 https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API";>FLIP-38。 +我们将工作分为以下几个阶段: + + + 首先以将Python Table API 直译为Java Table API的方式支持用户编写没有UDFs(标量函数/表值函数/聚合函数)的Python Table API程序。 + 增加对UDFs(标量函数/表值函数/聚合函数)在Python Table API的支持。 + 最后将Pandas与Python Table API进行集成,Pandas中的函数可以在Python Table API中直接使用。 + + Connectors & Formats 支持额外的 connectors 和 formats 是一个持续的过程。
[flink-web] branch asf-site updated: Add python table api section to the roadmap.
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new b20d169 Add python table api section to the roadmap. b20d169 is described below commit b20d1696b6c5fdca23ee9db88408fce278d78f34 Author: sunjincheng121 AuthorDate: Tue May 7 07:51:25 2019 +0800 Add python table api section to the roadmap. This closes #204 --- roadmap.md| 7 +++ roadmap.zh.md | 7 +++ 2 files changed, 14 insertions(+) diff --git a/roadmap.md b/roadmap.md index 2ef95c2..803363a 100644 --- a/roadmap.md +++ b/roadmap.md @@ -154,6 +154,13 @@ the catalogs ([FLINK-10232](https://issues.apache.org/jira/browse/FLINK-10232)). There is a broad effort to integrate Flink with the Hive Ecosystem, including metastore and Hive UDF support [FLINK-10556](https://issues.apache.org/jira/browse/FLINK-10556). +There is also a big effort to support Python for Table API [FLIP-38](https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API). +We will divide the work into following stages: + +- Translate Python Table API queries without UDFs to Java and run them completely in Java for the first step. +- Add support for User-defined functions(Scalar Function/Table Function/Aggregate Function) in the second step. +- Integrating Pandas as the final effort, i.e., functions in Pandas can be used in Python Table API directly. + # Connectors & Formats Support for additional connectors and formats is a continuous process. diff --git a/roadmap.zh.md b/roadmap.zh.md index 0e2f9ca..b3a01ec 100644 --- a/roadmap.zh.md +++ b/roadmap.zh.md @@ -100,6 +100,13 @@ Flink Web UI 正在移植到更新的框架中并获得其他功能并更好地 还有一个巨大的工作是将 Flink 与 Hive 生态系统集成。包括 Metastore 和 Hive UDF 支持 [FLINK-10556](https://issues.apache.org/jira/browse/FLINK-10556)。 +社区在Table API支持Python方面也做出了很多的努力 [FLIP-38](https://cwiki.apache.org/confluence/display/FLINK/FLIP-38%3A+Python+Table+API)。 +我们将工作分为以下几个阶段: + +- 首先以将Python Table API 直译为Java Table API的方式支持用户编写没有UDFs(标量函数/表值函数/聚合函数)的Python Table API程序。 +- 增加对UDFs(标量函数/表值函数/聚合函数)在Python Table API的支持。 +- 最后将Pandas与Python Table API进行集成,Pandas中的函数可以在Python Table API中直接使用。 + # Connectors & Formats 支持额外的 connectors 和 formats 是一个持续的过程。
[flink] branch master updated: [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface
This is an automated email from the ASF dual-hosted git repository. bli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new c954780 [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface c954780 is described below commit c954780c9fb2c0cc5b3730caff7e99bacbd534ba Author: bowen.li AuthorDate: Tue May 7 12:15:48 2019 -0700 [FLINK-12417][table] Unify ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface This PR unifies ReadableCatalog and ReadableWritableCatalog interfaces to Catalog interface to simplify the architecture and management of catalogs. This closes #8365. --- .../flink/table/catalog/hive/HiveCatalogBase.java | 4 +- .../table/catalog/GenericInMemoryCatalog.java | 2 +- .../{ReadableWritableCatalog.java => Catalog.java} | 239 - .../flink/table/catalog/CatalogTestBase.java | 2 +- 4 files changed, 241 insertions(+), 6 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java index b044de3..7c2fdcc 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/catalog/hive/HiveCatalogBase.java @@ -18,7 +18,7 @@ package org.apache.flink.table.catalog.hive; -import org.apache.flink.table.catalog.ReadableWritableCatalog; +import org.apache.flink.table.catalog.Catalog; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -46,7 +46,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * Base class for catalogs backed by Hive metastore. */ -public abstract class HiveCatalogBase implements ReadableWritableCatalog { +public abstract class HiveCatalogBase implements Catalog { private static final Logger LOG = LoggerFactory.getLogger(HiveCatalogBase.class); public static final String DEFAULT_DB = "default"; diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java index 19655ce..96f9e43 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/GenericInMemoryCatalog.java @@ -47,7 +47,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** * A generic catalog implementation that holds all meta objects in memory. */ -public class GenericInMemoryCatalog implements ReadableWritableCatalog { +public class GenericInMemoryCatalog implements Catalog { public static final String DEFAULT_DB = "default"; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java similarity index 60% rename from flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java rename to flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java index a398b72..917cf7a 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/ReadableWritableCatalog.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/Catalog.java @@ -18,6 +18,7 @@ package org.apache.flink.table.catalog; +import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException; import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException; @@ -33,14 +34,79 @@ import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException; import org.apache.flink.table.catalog.stats.CatalogColumnStatistics; import org.apache.flink.table.catalog.stats.CatalogTableStatistics; +import java.util.List; + /** - * An interface responsible for manipulating catalog metadata. + * This interface is responsible for reading and writing metadata such as database/table/views/UDFs + * from a registered catalog. It connects a registered catalog and Flink's Table API. */ -public interface ReadableWritableCatalog extends ReadableCatalog { +@PublicEvolving +public interface Catalog { + + /** +*
[flink] 02/02: [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit aedc4f5dcca95820969af694e6d2a71a1fe698bd Author: Zhijiang AuthorDate: Sun May 5 17:35:53 2019 +0800 [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment At the moment ResultPartition and SingleInputGate are created in Task. Based on new pluggable ShuffleManager, they should be created via ShuffleService#createResultPartitionWriter/InputGate. The NetworkEnvironment would be refactored into NetworkShuffleService future. So we could migrate the process of creating ResultPartition and SingleInputGate into current NetworkEnvironment. The metrics registration of network and buffers can also be done along with the creations. --- .../runtime/io/network/NetworkEnvironment.java | 113 ++- .../network/metrics/InputBufferPoolUsageGauge.java | 51 + .../io/network/metrics/InputBuffersGauge.java | 45 .../consumer => metrics}/InputChannelMetrics.java | 12 +- .../consumer => metrics}/InputGateMetrics.java | 20 ++-- .../metrics/OutputBufferPoolUsageGauge.java| 51 + .../io/network/metrics/OutputBuffersGauge.java | 45 .../ResultPartitionMetrics.java| 19 ++-- .../io/network/partition/ResultPartition.java | 2 +- .../partition/consumer/LocalInputChannel.java | 1 + .../partition/consumer/RemoteInputChannel.java | 1 + .../partition/consumer/SingleInputGate.java| 3 +- .../partition/consumer/UnknownInputChannel.java| 1 + .../runtime/metrics/groups/TaskIOMetricGroup.java | 124 - .../runtime/taskexecutor/TaskManagerServices.java | 8 +- .../NetworkEnvironmentConfiguration.java | 11 ++ .../org/apache/flink/runtime/taskmanager/Task.java | 100 ++--- .../io/network/NetworkEnvironmentBuilder.java | 15 ++- .../network/partition/InputChannelTestUtils.java | 2 +- .../partition/consumer/SingleInputGateTest.java| 1 + 20 files changed, 404 insertions(+), 221 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java index 0ee8595..b9b35c9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java @@ -19,19 +19,34 @@ package org.apache.flink.runtime.io.network; import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.JobID; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.buffer.BufferPool; import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool; +import org.apache.flink.runtime.io.network.metrics.InputBufferPoolUsageGauge; +import org.apache.flink.runtime.io.network.metrics.InputBuffersGauge; +import org.apache.flink.runtime.io.network.metrics.InputChannelMetrics; +import org.apache.flink.runtime.io.network.metrics.InputGateMetrics; +import org.apache.flink.runtime.io.network.metrics.OutputBufferPoolUsageGauge; +import org.apache.flink.runtime.io.network.metrics.OutputBuffersGauge; +import org.apache.flink.runtime.io.network.metrics.ResultPartitionMetrics; import org.apache.flink.runtime.io.network.netty.NettyConfig; import org.apache.flink.runtime.io.network.netty.NettyConnectionManager; import org.apache.flink.runtime.io.network.partition.ResultPartition; +import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration; import org.apache.flink.runtime.taskmanager.Task; +import org.apache.flink.runtime.taskmanager.TaskActions; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.Preconditions; @@ -56,6 +71,11 @@ public class NetworkEnvironment { private static final String METRIC_TOTAL_MEMORY_SEGMENT = "TotalMemorySegments"; private static final String METRIC_AVAILABLE_MEMORY_SEGMENT = "AvailableMemorySegm
[flink] branch master updated (b9704be -> aedc4f5)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b9704be [FLINK-12231][runtime] Introduce SchedulerNG interface new f392616 [hotfix][tests] Avoid mock NetworkEnvironment in tests new aedc4f5 [FLINK-11726][network] Refactor the creations of ResultPartition and SingleInputGate into NetworkEnvironment The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../runtime/io/network/NetworkEnvironment.java | 113 ++- .../network/metrics/InputBufferPoolUsageGauge.java | 33 +++--- .../io/network/metrics/InputBuffersGauge.java | 27 ++--- .../consumer => metrics}/InputChannelMetrics.java | 12 +- .../consumer => metrics}/InputGateMetrics.java | 20 ++-- .../OutputBufferPoolUsageGauge.java} | 47 .../network/metrics/OutputBuffersGauge.java} | 31 -- .../ResultPartitionMetrics.java| 19 ++-- .../io/network/partition/ResultPartition.java | 2 +- .../partition/consumer/LocalInputChannel.java | 1 + .../partition/consumer/RemoteInputChannel.java | 1 + .../partition/consumer/SingleInputGate.java| 3 +- .../partition/consumer/UnknownInputChannel.java| 1 + .../runtime/metrics/groups/TaskIOMetricGroup.java | 124 - .../runtime/taskexecutor/TaskManagerServices.java | 8 +- .../NetworkEnvironmentConfiguration.java | 11 ++ .../org/apache/flink/runtime/taskmanager/Task.java | 100 ++--- .../io/network/NetworkEnvironmentBuilder.java | 15 ++- .../network/partition/InputChannelTestUtils.java | 2 +- .../partition/consumer/SingleInputGateTest.java| 1 + .../runtime/taskexecutor/TaskExecutorTest.java | 15 ++- .../runtime/taskmanager/TaskAsyncCallTest.java | 17 ++- .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +- .../runtime/util/JvmExitOnFatalErrorTest.java | 3 +- .../tasks/InterruptSensitiveRestoreTest.java | 3 +- .../runtime/tasks/StreamTaskTerminationTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java| 4 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 3 +- 29 files changed, 374 insertions(+), 358 deletions(-) copy flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java => flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBufferPoolUsageGauge.java (52%) copy flink-metrics/flink-metrics-dropwizard/src/main/java/org/apache/flink/dropwizard/metrics/FlinkGaugeWrapper.java => flink-runtime/src/main/java/org/apache/flink/runtime/io/network/metrics/InputBuffersGauge.java (59%) rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition/consumer => metrics}/InputChannelMetrics.java (87%) rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition/consumer => metrics}/InputGateMetrics.java (86%) copy flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{LocalConnectionManager.java => metrics/OutputBufferPoolUsageGauge.java} (52%) copy flink-runtime/src/main/java/org/apache/flink/runtime/{taskmanager/NoOpTaskActions.java => io/network/metrics/OutputBuffersGauge.java} (58%) rename flink-runtime/src/main/java/org/apache/flink/runtime/io/network/{partition => metrics}/ResultPartitionMetrics.java (85%)
[flink] 01/02: [hotfix][tests] Avoid mock NetworkEnvironment in tests
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f392616f404a6471d99d64eeab5dd1dd3da1a4a0 Author: Zhijiang AuthorDate: Thu Apr 25 17:05:21 2019 +0800 [hotfix][tests] Avoid mock NetworkEnvironment in tests --- .../runtime/taskexecutor/TaskExecutorTest.java | 15 +-- .../runtime/taskmanager/TaskAsyncCallTest.java | 17 +++- .../apache/flink/runtime/taskmanager/TaskTest.java | 107 +++-- .../runtime/util/JvmExitOnFatalErrorTest.java | 3 +- .../tasks/InterruptSensitiveRestoreTest.java | 3 +- .../runtime/tasks/StreamTaskTerminationTest.java | 3 +- .../streaming/runtime/tasks/StreamTaskTest.java| 4 +- .../runtime/tasks/SynchronousCheckpointITCase.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 3 +- 9 files changed, 87 insertions(+), 74 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index b47ad53..bf57e2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -197,6 +197,8 @@ public class TaskExecutorTest extends TestLogger { private SettableLeaderRetrievalService jobManagerLeaderRetriever; + private NetworkEnvironment networkEnvironment; + @Before public void setup() throws IOException { rpc = new TestingRpcService(); @@ -220,6 +222,8 @@ public class TaskExecutorTest extends TestLogger { jobManagerLeaderRetriever = new SettableLeaderRetrievalService(); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetriever); haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetriever); + + networkEnvironment = new NetworkEnvironmentBuilder().build(); } @After @@ -239,6 +243,10 @@ public class TaskExecutorTest extends TestLogger { dummyBlobCacheService = null; } + if (networkEnvironment != null) { + networkEnvironment.shutdown(); + } + testingFatalErrorHandler.rethrowError(); } @@ -262,7 +270,6 @@ public class TaskExecutorTest extends TestLogger { MemoryType.HEAP, false); - final NetworkEnvironment networkEnvironment = new NetworkEnvironmentBuilder().build(); networkEnvironment.start(); final KvStateService kvStateService = new KvStateService(new KvStateRegistry(), null, null); @@ -699,8 +706,6 @@ public class TaskExecutorTest extends TestLogger { when(taskSlotTable.tryMarkSlotActive(eq(jobId), eq(allocationId))).thenReturn(true); when(taskSlotTable.addTask(any(Task.class))).thenReturn(true); - final NetworkEnvironment networkEnvironment = mock(NetworkEnvironment.class); - final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() @@ -958,13 +963,11 @@ public class TaskExecutorTest extends TestLogger { rpc.registerGateway(resourceManagerGateway.getAddress(), resourceManagerGateway); rpc.registerGateway(jobMasterGateway.getAddress(), jobMasterGateway); - final NetworkEnvironment networkMock = mock(NetworkEnvironment.class, Mockito.RETURNS_MOCKS); - final TaskExecutorLocalStateStoresManager localStateStoresManager = createTaskExecutorLocalStateStoresManager(); final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskManagerLocation(taskManagerLocation) - .setNetworkEnvironment(networkMock) + .setNetworkEnvironment(networkEnvironment) .setTaskSlotTable(taskSlotTable) .setJobLeaderService(jobLeaderService) .setJobManagerTable(jobManagerTable) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 9e06da0..1f743e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -41,11 +41,11 @@ import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.filecache.Fi
[flink] 03/03: [FLINK-12231][runtime] Introduce SchedulerNG interface
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit b9704be0512337b3c74a26f3fb36035e6480d522 Author: Gary Yao AuthorDate: Tue Apr 23 11:37:28 2019 +0200 [FLINK-12231][runtime] Introduce SchedulerNG interface Introduce SchedulerNG interface with a LegacyScheduler implementation, which hides direct calls to the ExecutionGraph from the JobMaster. The LegacyScheduler is only needed so that the existing scheduling code paths keep functioning while work on the new scheduling abstractions is in progress. Remove JobMasterTest#testAutomaticRestartingWhenCheckpointing() because it requires the JobMaster to expose internal state (RestartStrategy) - add unit tests for JobGraph#isCheckpointingEnabled() to make up for the removed test. --- .../dispatcher/DefaultJobManagerRunnerFactory.java | 7 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 506 - .../flink/runtime/jobmaster/LegacyScheduler.java | 626 + .../runtime/jobmaster/LegacySchedulerFactory.java | 82 +++ .../flink/runtime/jobmaster/SchedulerNG.java | 120 .../runtime/jobmaster/SchedulerNGFactory.java | 56 ++ .../factories/DefaultJobMasterServiceFactory.java | 16 +- .../flink/runtime/jobgraph/JobGraphTest.java | 46 ++ .../flink/runtime/jobmaster/JobMasterTest.java | 64 +-- 9 files changed, 1069 insertions(+), 454 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java index c6c537f..6962b6e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DefaultJobManagerRunnerFactory.java @@ -25,6 +25,8 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerRunner; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; import org.apache.flink.runtime.jobmaster.JobMasterConfiguration; +import org.apache.flink.runtime.jobmaster.LegacySchedulerFactory; +import org.apache.flink.runtime.jobmaster.SchedulerNGFactory; import org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory; import org.apache.flink.runtime.jobmaster.factories.JobManagerJobMetricGroupFactory; import org.apache.flink.runtime.jobmaster.factories.JobMasterServiceFactory; @@ -56,6 +58,8 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { final SlotPoolFactory slotPoolFactory = DefaultSlotPoolFactory.fromConfiguration(configuration); final SchedulerFactory schedulerFactory = DefaultSchedulerFactory.fromConfiguration(configuration); + final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory( + jobManagerServices.getRestartStrategyFactory()); final JobMasterServiceFactory jobMasterFactory = new DefaultJobMasterServiceFactory( jobMasterConfiguration, @@ -66,7 +70,8 @@ public enum DefaultJobManagerRunnerFactory implements JobManagerRunnerFactory { jobManagerServices, heartbeatServices, jobManagerJobMetricGroupFactory, - fatalErrorHandler); + fatalErrorHandler, + schedulerNGFactory); return new JobManagerRunner( jobGraph, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index cae6922..627cc45 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -18,37 +18,22 @@ package org.apache.flink.runtime.jobmaster; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.functions.AggregateFunction; -import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.CheckpointingOptions; -import org.apache.flink.core.io.InputSplit; import org.apache.flink.queryablestate.KvStateID; -import org.apache.flink.runtime.JobException; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.BlobWriter; -import org.apache.flink.runtime.checkpoint.CheckpointCoordinator; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; -import org
[flink] branch master updated (f8a6ea5 -> b9704be)
This is an automated email from the ASF dual-hosted git repository. gary pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f8a6ea5 [hotfix][tests] Rename variables in EagerSchedulingStrategyTest new 1090967 [hotfix][runtime] Add null check to RestartStrategyResolving new 4177ae6 [hotfix][runtime] Remove unused field ARCHIVE_NAME from JobMaster new b9704be [FLINK-12231][runtime] Introduce SchedulerNG interface The 3 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../dispatcher/DefaultJobManagerRunnerFactory.java | 7 +- .../restart/RestartStrategyResolving.java | 4 + .../apache/flink/runtime/jobmaster/JobMaster.java | 507 - .../flink/runtime/jobmaster/LegacyScheduler.java | 626 + .../runtime/jobmaster/LegacySchedulerFactory.java | 82 +++ .../flink/runtime/jobmaster/SchedulerNG.java | 120 .../runtime/jobmaster/SchedulerNGFactory.java | 56 ++ .../factories/DefaultJobMasterServiceFactory.java | 16 +- .../flink/runtime/jobgraph/JobGraphTest.java | 46 ++ .../flink/runtime/jobmaster/JobMasterTest.java | 64 +-- 10 files changed, 1073 insertions(+), 455 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacyScheduler.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/LegacySchedulerFactory.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNG.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SchedulerNGFactory.java
[flink] 02/03: [hotfix][runtime] Remove unused field ARCHIVE_NAME from JobMaster
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 4177ae6a8aeba72025a4793714ed86c20f963d22 Author: Gary Yao AuthorDate: Mon Apr 29 12:12:39 2019 +0200 [hotfix][runtime] Remove unused field ARCHIVE_NAME from JobMaster --- .../src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java | 1 - 1 file changed, 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 6c83c7a..cae6922 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -141,7 +141,6 @@ public class JobMaster extends FencedRpcEndpoint implements JobMast /** Default names for Flink's distributed components. */ public static final String JOB_MANAGER_NAME = "jobmanager"; - public static final String ARCHIVE_NAME = "archive"; //
[flink] 01/03: [hotfix][runtime] Add null check to RestartStrategyResolving
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 1090967f8af22b1801986ee39e1392738e8d3eea Author: Gary Yao AuthorDate: Mon Apr 29 10:10:12 2019 +0200 [hotfix][runtime] Add null check to RestartStrategyResolving --- .../runtime/executiongraph/restart/RestartStrategyResolving.java | 4 1 file changed, 4 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java index ad7aa93..7d3733b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/restart/RestartStrategyResolving.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.executiongraph.restart; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Utility method for resolving {@link RestartStrategy}. */ @@ -46,6 +48,8 @@ public final class RestartStrategyResolving { RestartStrategyFactory serverStrategyFactory, boolean isCheckpointingEnabled) { + checkNotNull(serverStrategyFactory); + final RestartStrategy clientSideRestartStrategy = RestartStrategyFactory.createRestartStrategy(clientConfiguration);
[flink] 01/02: [FLINK-12228][runtime] Implement Eager Scheduling Strategy
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f6fc678297a29e16b73ff8d5159214d5afec0ef2 Author: shuai-xu AuthorDate: Fri Apr 26 13:54:06 2019 +0800 [FLINK-12228][runtime] Implement Eager Scheduling Strategy This closes #8296. --- .../scheduler/ExecutionVertexDeploymentOption.java | 14 ++- .../strategy/EagerSchedulingStrategy.java | 112 +++ .../scheduler/strategy/SchedulingStrategy.java | 4 +- .../strategy/EagerSchedulingStrategyTest.java | 121 + .../strategy/TestingSchedulerOperations.java | 44 .../strategy/TestingSchedulingExecutionVertex.java | 57 ++ .../strategy/TestingSchedulingTopology.java| 65 +++ 7 files changed, 413 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java index 829f6ba..9831a9e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java @@ -20,6 +20,8 @@ package org.apache.flink.runtime.scheduler; import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; +import static org.apache.flink.util.Preconditions.checkNotNull; + /** * Component that stores the task need to be scheduled and the option for deployment. */ @@ -30,7 +32,15 @@ public class ExecutionVertexDeploymentOption { private final DeploymentOption deploymentOption; public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) { - this.executionVertexId = executionVertexId; - this.deploymentOption = deploymentOption; + this.executionVertexId = checkNotNull(executionVertexId); + this.deploymentOption = checkNotNull(deploymentOption); + } + + public ExecutionVertexID getExecutionVertexId() { + return executionVertexId; + } + + public DeploymentOption getDeploymentOption() { + return deploymentOption; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java new file mode 100644 index 000..954fff5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java @@ -0,0 +1,112 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.scheduler.DeploymentOption; +import org.apache.flink.runtime.scheduler.ExecutionVertexDeploymentOption; +import org.apache.flink.runtime.scheduler.SchedulerOperations; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.StreamSupport; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * {@link SchedulingStrategy} instance for streaming job which will schedule all tasks at the same time. + */ +public class EagerSchedulingStrategy implements SchedulingStrategy { + + private final SchedulerOperations schedulerOperations; + + private final SchedulingTopology schedulingTopology; + + private final DeploymentOption deploymentOption = new DeploymentOption(false); + + public EagerSchedulingStrategy( + SchedulerOperations schedulerOperations, + SchedulingTopology schedulingTopology) { + this.schedulerOperations = checkNotNull(schedulerOperations)
[flink] 02/02: [hotfix][tests] Rename variables in EagerSchedulingStrategyTest
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit f8a6ea584b9841598f604761ea012136db86c97b Author: Gary Yao AuthorDate: Wed May 8 13:09:15 2019 +0200 [hotfix][tests] Rename variables in EagerSchedulingStrategyTest --- .../strategy/EagerSchedulingStrategyTest.java | 18 +- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java index 881159d..1808bd8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java @@ -73,10 +73,10 @@ public class EagerSchedulingStrategyTest extends TestLogger { assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(1)); Collection scheduledVertices = testingSchedulerOperations.getScheduledVertices().get(0); - assertThat(scheduledVertices, hasSize(5)); - Collection vertices = getExecutionVertexIdsFromDeployOptions(scheduledVertices); + Collection scheduledVertexIDs = getExecutionVertexIdsFromDeployOptions(scheduledVertices); + assertThat(scheduledVertexIDs, hasSize(5)); for (SchedulingExecutionVertex schedulingExecutionVertex : testingSchedulingTopology.getVertices()) { - assertThat(vertices, hasItem(schedulingExecutionVertex.getId())); + assertThat(scheduledVertexIDs, hasItem(schedulingExecutionVertex.getId())); } } @@ -92,24 +92,24 @@ public class EagerSchedulingStrategyTest extends TestLogger { testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 3)); testingSchedulingTopology.addSchedulingExecutionVertex(new TestingSchedulingExecutionVertex(jobVertexID, 4)); - Set toBeRestartedVertices1 = new HashSet<>(Arrays.asList( + Set verticesToRestart1 = new HashSet<>(Arrays.asList( new ExecutionVertexID(jobVertexID, 0), new ExecutionVertexID(jobVertexID, 4))); - schedulingStrategy.restartTasks(toBeRestartedVertices1); + schedulingStrategy.restartTasks(verticesToRestart1); - Set toBeRestartedVertices2 = new HashSet<>(Arrays.asList( + Set verticesToRestart2 = new HashSet<>(Arrays.asList( new ExecutionVertexID(jobVertexID, 1), new ExecutionVertexID(jobVertexID, 2), new ExecutionVertexID(jobVertexID, 3))); - schedulingStrategy.restartTasks(toBeRestartedVertices2); + schedulingStrategy.restartTasks(verticesToRestart2); assertThat(testingSchedulerOperations.getScheduledVertices(), hasSize(2)); Collection scheduledVertices1 = testingSchedulerOperations.getScheduledVertices().get(0); - assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(toBeRestartedVertices1.toArray())); + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices1), containsInAnyOrder(verticesToRestart1.toArray())); Collection scheduledVertices2 = testingSchedulerOperations.getScheduledVertices().get(1); - assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(toBeRestartedVertices2.toArray())); + assertThat(getExecutionVertexIdsFromDeployOptions(scheduledVertices2), containsInAnyOrder(verticesToRestart2.toArray())); } private static Collection getExecutionVertexIdsFromDeployOptions(
[flink] branch master updated (9a4c3dc -> f8a6ea5)
This is an automated email from the ASF dual-hosted git repository. gary pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 9a4c3dc [FLINK-12227][runtime] Introduce SchedulingStrategy interface new f6fc678 [FLINK-12228][runtime] Implement Eager Scheduling Strategy new f8a6ea5 [hotfix][tests] Rename variables in EagerSchedulingStrategyTest The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../scheduler/ExecutionVertexDeploymentOption.java | 14 ++- .../strategy/EagerSchedulingStrategy.java | 112 +++ .../scheduler/strategy/SchedulingStrategy.java | 4 +- .../strategy/EagerSchedulingStrategyTest.java | 121 + .../strategy/TestingSchedulerOperations.java} | 25 +++-- .../TestingSchedulingExecutionVertex.java} | 31 -- .../strategy/TestingSchedulingTopology.java| 65 +++ 7 files changed, 351 insertions(+), 21 deletions(-) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategy.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/EagerSchedulingStrategyTest.java copy flink-runtime/src/{main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategyFactory.java => test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulerOperations.java} (55%) copy flink-runtime/src/{main/java/org/apache/flink/runtime/rest/messages/EmptyMessageParameters.java => test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingExecutionVertex.java} (53%) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/strategy/TestingSchedulingTopology.java
[flink-shaded] annotated tag release-7.0-rc1 updated (7d7c687 -> 585d6b8)
This is an automated email from the ASF dual-hosted git repository. jincheng pushed a change to annotated tag release-7.0-rc1 in repository https://gitbox.apache.org/repos/asf/flink-shaded.git. *** WARNING: tag release-7.0-rc1 was modified! *** from 7d7c687 (commit) to 585d6b8 (tag) tagging 7d7c68799a0488007bed63033c1e8110210a6cfb (commit) replaces release-6.0 by sunjincheng121 on Wed May 8 14:54:46 2019 +0800 - Log - release-7.0-rc1 -BEGIN PGP SIGNATURE- iQIzBAABCAAdFiEEj+oe6dAEjAzMcLdXMhGwcDt56g4FAlzSfTYACgkQMhGwcDt5 6g7QFQ/+NaV06RQmhhZG0n7JCB12Yo8vG0ovWovcd3ADGxYEEpRpbesCFI/YaKgV frfRA+5JCbRVKoHdXXeV/WAKDzta6Of4w3sw3oBAVDRPZB3o1sHGVvOM73LPMtAz ZAUi3GD0JwgPc/fFHL/K4EjIv8eTVDN4YH5YDdupyYIMuolywvl2eOw1YF7gqoQL ZMb538xVF4ob3neizYrcYaEH3OTPUsLwb4hObJUbIT2zqcZWsGTlLjEzp2Bp9WHz od+nxM4P8LsdXIc03oGmncTkxRgb9Mlixw3FdKrU05WFh2Q+qVReJgKLQm2uxkXQ vyWwxFoFU0exqvoAkdeRGwF3jSqk4dP1Y46HSNoxh/JaN7kJbvbj2VDLh3Jj+iWq AQwxUoOf3T22hVPvBn3pbGEk5SjM++B7u/suxpXsZ8Eh3+ooNd5NlLd90GuxmfV6 6SiLjkYo5bEZCzZjBAxHU3wWvtg5kR+WI+FfVtxHW3maakJusQZf47DdEauh5X+v kuRoShYic+v0jy9d1dFa2Jn2S70Q42JWapKjZQT9i/qFlOddhPeHCHd5YtksKHfe f/7wmWsE0FBS5tAhBuWXaGeQKyRg99PsRY40bYCawjwNK5q8U2fu8xethJtxpKDa cFXq1nV+d+q24TFT/9Hhr1QwADDRIwUxpr+b4dofq37GoiZ9Lx8= =U9wq -END PGP SIGNATURE- --- No new revisions were added by this update. Summary of changes:
svn commit: r33943 - in /dev/flink/flink-shaded-7.0-rc1: ./ flink-shaded-7.0-src.tgz flink-shaded-7.0-src.tgz.asc flink-shaded-7.0-src.tgz.sha512
Author: jincheng Date: Wed May 8 10:39:29 2019 New Revision: 33943 Log: flink-shaded-7.0 release Added: dev/flink/flink-shaded-7.0-rc1/ dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz (with props) dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512 Added: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz == Binary file - no diff available. Propchange: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz -- svn:mime-type = application/octet-stream Added: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc == --- dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc (added) +++ dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.asc Wed May 8 10:39:29 2019 @@ -0,0 +1,16 @@ +-BEGIN PGP SIGNATURE- + +iQIzBAABCAAdFiEEj+oe6dAEjAzMcLdXMhGwcDt56g4FAlzSfVcACgkQMhGwcDt5 +6g7lTw//URjyaiM029ktPaagBy8I0Hp4Ltfdo8aenkohdLLP7txj+g78+t3/M8f0 +xXD/FdRqbUGr8ok4siEvsuiX8rWv0HUhRkNZ5dvG6esPlVWXuqBP+xjdYi/8XfTS +OfhNYM248RVY+2Hu1q8BC+VTV7vwi6ZBZY72Lb/OkMt64Hg5KD/uVmvqvnfeRA27 +PvzyzkzP6pwFk8Rg8+3b0RENp6CRk41vrbR303B+Dwld2OcEB9IrxTQzrzDtWx9X +361/tJWBBqXNLjaH5R7i4X9Ipu7i7GlkigY7PdxlJYbpPZnjXC0C82AZs6C5U//C +ro1+n9uN/wj0sO0hQMhTlYkWiTBiy7HnjSpC9sIpJpX0U+SWs60z2Dxp/K0cDZSe +BS5jcoY4gpcZrsTE7igOWqwXInZRi2NpisUjqU7qVnOMr5OnNfEhJ3mGTxlA34G5 ++SiZeDDCQZhKvqhR8SlemkMW3lMIxfOvuZOVrJhXV0RTVz6hpJxyCm7LZRwAaQK9 +9uvmCnLEE92RZzBc4eI8vHEY23VOQv5HzGYJ4z804iqTPy8s/V1R++n3A1HPLTf+ +2rbnrEwzB4srTrWOjFxslIn9cVq+ZLVSvqrhEPfoTK2VwNmdK7xbX7gg5fhNJ9yR +quvPKgJyzNCdBwU43neirMlS0uUnvgdGTuXOvX9ul5pgJCrb6DY= +=txgS +-END PGP SIGNATURE- Added: dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512 == --- dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512 (added) +++ dev/flink/flink-shaded-7.0-rc1/flink-shaded-7.0-src.tgz.sha512 Wed May 8 10:39:29 2019 @@ -0,0 +1 @@ +a8c14810a3192fb016bf444273a67945091f5c1cea1bc02488070ce935062094b18477afd99b1b7108892b94be07541b000defb9e202a498da4b1e55d82e64e3 flink-shaded-7.0-src.tgz
[flink-shaded] branch master updated: [FLINK-12397][build] Drop flink-shaded-asm-5
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-shaded.git The following commit(s) were added to refs/heads/master by this push: new ed42ad1 [FLINK-12397][build] Drop flink-shaded-asm-5 ed42ad1 is described below commit ed42ad1e9b4e9d9588acc5de61c7703f765a298e Author: Chesnay Schepler AuthorDate: Wed May 8 11:11:31 2019 +0200 [FLINK-12397][build] Drop flink-shaded-asm-5 --- flink-shaded-asm-5/pom.xml | 83 -- .../src/main/resources/META-INF/NOTICE | 10 --- .../main/resources/META-INF/licenses/LICENSE.asm | 31 pom.xml| 1 - 4 files changed, 125 deletions(-) diff --git a/flink-shaded-asm-5/pom.xml b/flink-shaded-asm-5/pom.xml deleted file mode 100644 index 1e72bff..000 --- a/flink-shaded-asm-5/pom.xml +++ /dev/null @@ -1,83 +0,0 @@ - - -http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; - xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd";> - -4.0.0 - - -org.apache.flink -flink-shaded -7.0 -.. - - -flink-shaded-asm -flink-shaded-asm-5 -${asm.version}-7.0 - -jar - - -5.0.4 - - - - -org.ow2.asm -asm-all -${asm.version} - - - - - - -org.apache.maven.plugins -maven-shade-plugin - - -shade-flink -package - -shade - - - true - ${project.basedir}/target/dependency-reduced-pom.xml - - -org.ow2.asm:* - - - - -org.objectweb - org.apache.flink.shaded.asm5.org.objectweb - - - - - - - - - - diff --git a/flink-shaded-asm-5/src/main/resources/META-INF/NOTICE b/flink-shaded-asm-5/src/main/resources/META-INF/NOTICE deleted file mode 100644 index a3065f2..000 --- a/flink-shaded-asm-5/src/main/resources/META-INF/NOTICE +++ /dev/null @@ -1,10 +0,0 @@ -flink-shaded-asm -Copyright 2014-2018 The Apache Software Foundation - -This product includes software developed at -The Apache Software Foundation (http://www.apache.org/). - -This project bundles the following dependencies under the BSD license. -See bundled license files for details. - -- org.ow2.asm:asm-all:5.0.4 \ No newline at end of file diff --git a/flink-shaded-asm-5/src/main/resources/META-INF/licenses/LICENSE.asm b/flink-shaded-asm-5/src/main/resources/META-INF/licenses/LICENSE.asm deleted file mode 100644 index 62ffbcc..000 --- a/flink-shaded-asm-5/src/main/resources/META-INF/licenses/LICENSE.asm +++ /dev/null @@ -1,31 +0,0 @@ -ASM: a very small and fast Java bytecode manipulation framework - -Copyright (c) 2000-2011 INRIA, France Telecom -All rights reserved. - -Redistribution and use in source and binary forms, with or without -modification, are permitted provided that the following conditions -are met: - -1. Redistributions of source code must retain the above copyright - notice, this list of conditions and the following disclaimer. - -2. Redistributions in binary form must reproduce the above copyright - notice, this list of conditions and the following disclaimer in the - documentation and/or other materials provided with the distribution. - -3. Neither the name of the copyright holders nor the names of its - contributors may be used to endorse or promote products derived from - this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
[flink] 01/02: [FLINK-12227][runtime] Add interfaces for SchedulingTopology
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 8c862e5fba73a335995c1667bb036ab80edf579e Author: Till Rohrmann AuthorDate: Thu Apr 18 17:57:11 2019 +0200 [FLINK-12227][runtime] Add interfaces for SchedulingTopology The SchedulingTopology contains the information which is provided to the SchedulingStrategy in order to decide which execution vertices should be scheduled next. --- .../scheduler/strategy/ExecutionVertexID.java | 76 + .../strategy/SchedulingExecutionVertex.java| 58 + .../strategy/SchedulingResultPartition.java| 99 ++ .../scheduler/strategy/SchedulingTopology.java | 53 4 files changed, 286 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java new file mode 100644 index 000..6e44e11 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler.strategy; + +import org.apache.flink.runtime.executiongraph.ExecutionVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; + +import static org.apache.flink.util.Preconditions.checkArgument; +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Id identifying {@link ExecutionVertex}. + */ +public class ExecutionVertexID { + private final JobVertexID jobVertexId; + + private final int subtaskIndex; + + public ExecutionVertexID(JobVertexID jobVertexId, int subtaskIndex) { + checkArgument(subtaskIndex >= 0, "subtaskIndex must be greater than or equal to 0"); + + this.jobVertexId = checkNotNull(jobVertexId); + this.subtaskIndex = subtaskIndex; + } + + public JobVertexID getJobVertexId() { + return jobVertexId; + } + + public int getSubtaskIndex() { + return subtaskIndex; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (o == null || getClass() != o.getClass()) { + return false; + } + + ExecutionVertexID that = (ExecutionVertexID) o; + + return subtaskIndex == that.subtaskIndex && jobVertexId.equals(that.jobVertexId); + } + + @Override + public int hashCode() { + int result = jobVertexId.hashCode(); + result = 31 * result + subtaskIndex; + return result; + } + + @Override + public String toString() { + return jobVertexId + "_" + subtaskIndex; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java new file mode 100644 index 000..b9b2271 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingExecutionVertex.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing p
[flink] 02/02: [FLINK-12227][runtime] Introduce SchedulingStrategy interface
This is an automated email from the ASF dual-hosted git repository. gary pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 9a4c3dc54b462c44987b36bb6af6f7a570d829a7 Author: shuai-xu AuthorDate: Mon Apr 22 11:32:39 2019 +0800 [FLINK-12227][runtime] Introduce SchedulingStrategy interface This closes #8233. --- .../flink/runtime/scheduler/DeploymentOption.java | 35 .../scheduler/ExecutionVertexDeploymentOption.java | 36 + .../runtime/scheduler/SchedulerOperations.java | 36 + .../scheduler/strategy/SchedulingStrategy.java | 62 ++ .../strategy/SchedulingStrategyFactory.java| 33 5 files changed, 202 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java new file mode 100644 index 000..9fb9ace --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +/** + * Deployment option which indicates whether the task should send scheduleOrUpdateConsumer message to master. + */ +public class DeploymentOption { + + private final boolean sendScheduleOrUpdateConsumerMessage; + + public DeploymentOption(boolean sendScheduleOrUpdateConsumerMessage) { + this.sendScheduleOrUpdateConsumerMessage = sendScheduleOrUpdateConsumerMessage; + } + + public boolean sendScheduleOrUpdateConsumerMessage() { + return sendScheduleOrUpdateConsumerMessage; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java new file mode 100644 index 000..829f6ba --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/ExecutionVertexDeploymentOption.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.scheduler; + +import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID; + +/** + * Component that stores the task need to be scheduled and the option for deployment. + */ +public class ExecutionVertexDeploymentOption { + + private final ExecutionVertexID executionVertexId; + + private final DeploymentOption deploymentOption; + + public ExecutionVertexDeploymentOption(ExecutionVertexID executionVertexId, DeploymentOption deploymentOption) { + this.executionVertexId = executionVertexId; + this.deploymentOption = deploymentOption; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java new file mode 100644 index 000..50d3f87 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerOperations.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright own
[flink] branch master updated (12d698b -> 9a4c3dc)
This is an automated email from the ASF dual-hosted git repository. gary pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 12d698b [FLINK-12365][table] Add stats related catalog APIs new 8c862e5 [FLINK-12227][runtime] Add interfaces for SchedulingTopology new 9a4c3dc [FLINK-12227][runtime] Introduce SchedulingStrategy interface The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/runtime/scheduler/DeploymentOption.java | 28 ++ .../ExecutionVertexDeploymentOption.java} | 19 ++--- .../SchedulerOperations.java} | 26 +++--- .../scheduler/strategy/ExecutionVertexID.java | 49 +-- .../strategy/SchedulingExecutionVertex.java} | 40 ++--- .../strategy/SchedulingResultPartition.java| 99 ++ .../scheduler/strategy/SchedulingStrategy.java | 62 ++ .../strategy/SchedulingStrategyFactory.java} | 17 ++-- .../scheduler/strategy/SchedulingTopology.java | 53 9 files changed, 306 insertions(+), 87 deletions(-) copy flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/TimeoutLatch.java => flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DeploymentOption.java (62%) copy flink-runtime/src/main/java/org/apache/flink/runtime/{JobException.java => scheduler/ExecutionVertexDeploymentOption.java} (63%) copy flink-runtime/src/main/java/org/apache/flink/runtime/{rest/messages/JobMessageParameters.java => scheduler/SchedulerOperations.java} (61%) copy flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogPartitionSpec.java => flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/ExecutionVertexID.java (51%) copy flink-runtime/src/main/java/org/apache/flink/runtime/{jobgraph/DistributionPattern.java => scheduler/strategy/SchedulingExecutionVertex.java} (53%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingResultPartition.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingStrategy.java copy flink-runtime/src/main/java/org/apache/flink/runtime/{minicluster/RpcServiceSharing.java => scheduler/strategy/SchedulingStrategyFactory.java} (68%) create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/strategy/SchedulingTopology.java