[flink] branch master updated: [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs
This is an automated email from the ASF dual-hosted git repository. tison pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new f80c384 [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs f80c384 is described below commit f80c3847d854e0d6a62577cd1e998b57b67fc9f6 Author: Canbin Zheng AuthorDate: Wed Mar 25 14:35:48 2020 +0800 [FLINK-16625][utils] Extract BootstrapTools#getEnvironmentVariables to ConfigurationUtils#getPrefixedKeyValuePairs This closes #11458 . --- .../flink/configuration/ConfigurationUtils.java | 15 +++ .../flink/configuration/ConfigurationUtilsTest.java | 20 +++- .../parameters/AbstractKubernetesParameters.java | 10 -- .../parameters/KubernetesJobManagerParameters.java | 3 ++- .../runtime/clusterframework/BootstrapTools.java | 18 -- .../runtime/clusterframework/BootstrapToolsTest.java | 5 +++-- .../org/apache/flink/yarn/YarnClusterDescriptor.java | 3 ++- 7 files changed, 41 insertions(+), 33 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 745d474..9eda0e4 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -219,6 +219,21 @@ public class ConfigurationUtils { return configs; } + /** +* Extract and parse Flink configuration properties with a given name prefix and +* return the result as a Map. +*/ + public static Map getPrefixedKeyValuePairs(String prefix, Configuration configuration) { + Map result = new HashMap<>(); + for (Map.Entry entry: configuration.toMap().entrySet()) { + if (entry.getKey().startsWith(prefix) && entry.getKey().length() > prefix.length()) { + String key = entry.getKey().substring(prefix.length()); + result.put(key, entry.getValue()); + } + } + return result; + } + // Make sure that we cannot instantiate this class private ConfigurationUtils() { } diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java index fa9345a..be5d340 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationUtilsTest.java @@ -55,7 +55,7 @@ public class ConfigurationUtilsTest extends TestLogger { } @Test - public void testHideSensitiveValues() { + public void testHideSensitiveValues() { final Map keyValuePairs = new HashMap<>(); keyValuePairs.put("foobar", "barfoo"); final String secretKey1 = "secret.key"; @@ -74,4 +74,22 @@ public class ConfigurationUtilsTest extends TestLogger { assertThat(hiddenSensitiveValues, is(equalTo(expectedKeyValuePairs))); } + @Test + public void testGetPrefixedKeyValuePairs() { + final String prefix = "test.prefix."; + final Map expectedKeyValuePairs = new HashMap() { + { + put("k1", "v1"); + put("k2", "v2"); + } + }; + + final Configuration configuration = new Configuration(); + expectedKeyValuePairs.forEach((k, v) -> configuration.setString(prefix + k, v)); + + final Map resultKeyValuePairs = ConfigurationUtils.getPrefixedKeyValuePairs(prefix, configuration); + + assertThat(resultKeyValuePairs, is(equalTo(expectedKeyValuePairs))); + } + } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java index 8e7c443..4e0916a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/AbstractKubernetesParameters.java @@ -22,7 +22,6 @@ import org.apache.flink.client.cli.CliFrontend; import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions; import org.apache.flink.kubernetes.utils.Cons
[flink] branch master updated (bb4ec22 -> f4b68c4)
This is an automated email from the ASF dual-hosted git repository. tison pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from bb4ec22 [FLINK-16712][task] Refactor StreamTask to construct final fields add f4b68c4 [FLINK-15640][k8s] Support to set labels for jobmanager and taskmanager pod No new revisions were added by this update. Summary of changes: .../generated/kubernetes_config_configuration.html | 12 +++ .../configuration/KubernetesConfigOptions.java | 14 + .../parameters/AbstractKubernetesParameters.java | 3 ++- .../parameters/KubernetesJobManagerParameters.java | 8 +--- .../KubernetesTaskManagerParameters.java | 6 +- .../flink/kubernetes/utils/KubernetesUtils.java| 3 ++- .../flink/kubernetes/KubernetesTestBase.java | 6 ++ .../kubeclient/KubernetesJobManagerTestBase.java | 8 .../kubeclient/KubernetesTaskManagerTestBase.java | 9 + .../decorators/ExternalServiceDecoratorTest.java | 1 + .../decorators/InitJobManagerDecoratorTest.java| 1 + .../decorators/InitTaskManagerDecoratorTest.java | 1 + .../decorators/InternalServiceDecoratorTest.java | 1 + .../factory/KubernetesJobManagerFactoryTest.java | 6 -- .../factory/KubernetesTaskManagerFactoryTest.java | 2 +- .../KubernetesJobManagerParametersTest.java| 21 .../KubernetesTaskManagerParametersTest.java | 23 ++ 17 files changed, 108 insertions(+), 17 deletions(-)
[flink] branch master updated (d38a010 -> bb4ec22)
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d38a010 [FLINK-15579][table-planner-blink] Fix UpsertStreamTableSink support and add tests add bb4ec22 [FLINK-16712][task] Refactor StreamTask to construct final fields No new revisions were added by this update. Summary of changes: .../flink/state/api/output/BoundedStreamTask.java | 2 +- .../runtime/tasks/AbstractTwoInputStreamTask.java | 2 +- .../runtime/tasks/MultipleInputStreamTask.java | 2 +- .../runtime/tasks/OneInputStreamTask.java | 4 +- .../runtime/tasks/SourceReaderStreamTask.java | 2 +- .../streaming/runtime/tasks/SourceStreamTask.java | 4 +- .../runtime/tasks/StreamIterationHead.java | 2 +- .../runtime/tasks/StreamIterationTail.java | 2 +- .../flink/streaming/runtime/tasks/StreamTask.java | 68 +- .../runtime/tasks/TwoInputStreamTask.java | 2 +- .../runtime/tasks/LocalStateForwardingTest.java| 2 +- .../MultipleInputStreamTaskTestHarnessBuilder.java | 4 +- .../tasks/OneInputStreamTaskTestHarness.java | 10 ++-- .../tasks/StreamTaskExecutionDecorationTest.java | 2 +- .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 6 +- .../tasks/StreamTaskSelectiveReadingTest.java | 2 +- .../runtime/tasks/StreamTaskTerminationTest.java | 2 +- .../streaming/runtime/tasks/StreamTaskTest.java| 19 +++--- .../runtime/tasks/StreamTaskTestHarness.java | 18 +++--- .../runtime/tasks/SynchronousCheckpointITCase.java | 2 +- .../runtime/tasks/SynchronousCheckpointTest.java | 6 +- .../tasks/TaskCheckpointingBehaviourTest.java | 2 +- .../tasks/TwoInputStreamTaskTestHarness.java | 6 +- .../flink/streaming/util/MockStreamTask.java | 2 +- .../streaming/util/MockStreamTaskBuilder.java | 2 +- .../codegen/agg/batch/BatchAggTestBase.scala | 5 +- .../jobmaster/JobMasterStopWithSavepointIT.java| 6 +- 27 files changed, 91 insertions(+), 95 deletions(-)
[flink] branch master updated (1827e4d -> d38a010)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 1827e4d [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7 add 5642479 [FLINK-15579][table-planner-blink] UpsertStreamTableSink should work on batch mode add d38a010 [FLINK-15579][table-planner-blink] Fix UpsertStreamTableSink support and add tests No new revisions were added by this update. Summary of changes: .../java/io/jdbc/JDBCUpsertTableSinkITCase.java| 40 ++ .../plan/nodes/physical/batch/BatchExecSink.scala | 21 -- .../nodes/physical/stream/StreamExecSink.scala | 13 +--- .../planner/plan/utils/UpdatingPlanChecker.scala | 42 +++ .../runtime/batch/table/TableSinkITCase.scala | 87 +- 5 files changed, 168 insertions(+), 35 deletions(-)
[flink-statefun] 01/02: [hotfix] Make namespace in State Bootstrap example to match GreetStatefulFunction
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git commit 31c851eab04f11ec21253e7a00d0530df057a945 Author: Igal Shilman AuthorDate: Tue Mar 24 21:55:03 2020 +0100 [hotfix] Make namespace in State Bootstrap example to match GreetStatefulFunction This example creates a savepoint for the Greeter example, but it uses the wrong namespace. This closes #71. --- .../state/processor/example/GreetStatefulFunctionBootstrapExample.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/example/GreetStatefulFunctionBootstrapExample.java b/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/example/GreetStatefulFunctionBootstrapExample.java index d5c7680..dabd236 100644 --- a/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/example/GreetStatefulFunctionBootstrapExample.java +++ b/statefun-flink/statefun-flink-state-processor/src/main/java/org/apache/flink/statefun/flink/state/processor/example/GreetStatefulFunctionBootstrapExample.java @@ -43,7 +43,7 @@ import org.apache.flink.statefun.sdk.state.PersistedValue; */ public class GreetStatefulFunctionBootstrapExample { - private static final FunctionType GREETER_FUNCTION_TYPE = new FunctionType("foo.bar", "greeter"); + private static final FunctionType GREETER_FUNCTION_TYPE = new FunctionType("apache", "greeter"); public static void main(String[] args) throws Exception { final ParameterTool params = ParameterTool.fromArgs(args);
[flink-statefun] branch release-2.0 updated (5f7714b -> 61d6609)
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a change to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git. from 5f7714b [hotfix] [legal] Update missing entries in distribution jar NOTICE file new 31c851e [hotfix] Make namespace in State Bootstrap example to match GreetStatefulFunction new 61d6609 [FLINK-16752] Add a repackage goal to ridesharing example The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../statefun-ridesharing-example-simulator/pom.xml| 8 .../processor/example/GreetStatefulFunctionBootstrapExample.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-)
[flink-statefun] 02/02: [FLINK-16752] Add a repackage goal to ridesharing example
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a commit to branch release-2.0 in repository https://gitbox.apache.org/repos/asf/flink-statefun.git commit 61d660971a0049af97f17993c50cd7ca9e7b1c40 Author: Igal Shilman AuthorDate: Tue Mar 24 19:51:19 2020 +0100 [FLINK-16752] Add a repackage goal to ridesharing example Changing the maven parent from spring-boot to statefun-parent had missed a default configuration of the spring-boot-maven-plugin that triggers the repackage goal. This closes #70. --- .../statefun-ridesharing-example-simulator/pom.xml| 8 1 file changed, 8 insertions(+) diff --git a/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-simulator/pom.xml b/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-simulator/pom.xml index 7197cd8..c312108 100644 --- a/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-simulator/pom.xml +++ b/statefun-examples/statefun-ridesharing-example/statefun-ridesharing-example-simulator/pom.xml @@ -121,6 +121,14 @@ under the License. org.springframework.boot spring-boot-maven-plugin + + +package + +repackage + + +
[flink-statefun] branch master updated (cfcc123 -> 67b75eb)
This is an automated email from the ASF dual-hosted git repository. tzulitai pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-statefun.git. from cfcc123 [hotfix] [legal] Update missing entries in distribution jar NOTICE file add a63d0f4 [hotfix] Make namespace in State Bootstrap example to match GreetStatefulFunction add 67b75eb [FLINK-16752] Add a repackage goal to ridesharing example No new revisions were added by this update. Summary of changes: .../statefun-ridesharing-example-simulator/pom.xml| 8 .../processor/example/GreetStatefulFunctionBootstrapExample.java | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-)
[flink] branch release-1.10 updated: [FLINK-16759][hive] HiveModuleTest failed to compile on release-1.10
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 572717d [FLINK-16759][hive] HiveModuleTest failed to compile on release-1.10 572717d is described below commit 572717dbc35369d29441c2ad4eb0ba73aa803ee0 Author: Jingsong Lee AuthorDate: Wed Mar 25 11:02:39 2020 +0800 [FLINK-16759][hive] HiveModuleTest failed to compile on release-1.10 This closes #11503 --- .../test/java/org/apache/flink/table/module/hive/HiveModuleTest.java| 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java index 04cf165..2939f64 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java @@ -155,7 +155,7 @@ public class HiveModuleTest { TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); tableEnv.unloadModule("core"); - tableEnv.loadModule("hive", new HiveModule()); + tableEnv.loadModule("hive", new HiveModule(HiveShimLoader.getHiveVersion())); List results = TableUtils.collectToList(tableEnv.sqlQuery("select str_to_map('a:1,b:2,c:3',',',':')"));
[flink] branch master updated: [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 1827e4d [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7 1827e4d is described below commit 1827e4dddfbac75a533ff2aea2f3e690777a3e5e Author: Leonard Xu AuthorDate: Wed Mar 25 10:13:41 2020 +0800 [FLINK-16170][elasticsearch] Fix SearchTemplateRequest ClassNotFoundException when using flink-sql-connector-elasticsearch7 We shouldn't `exclude org.elasticsearch:elasticsearch-geo` and `org.elasticsearch.plugin:lang-mustache-client` when shading. This closes #11396 --- .../flink-sql-connector-elasticsearch7/pom.xml | 16 +++--- flink-connectors/pom.xml | 1 + .../flink-sql-client-test/pom.xml | 13 + flink-end-to-end-tests/run-nightly-tests.sh| 6 ++- .../test-scripts/elasticsearch-common.sh | 2 +- .../test-scripts/test_sql_client.sh| 61 +++--- tools/travis/splits/split_misc.sh | 6 ++- 7 files changed, 74 insertions(+), 31 deletions(-) diff --git a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml index 0f2da3f..d1e289d 100644 --- a/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml +++ b/flink-connectors/flink-sql-connector-elasticsearch7/pom.xml @@ -63,14 +63,10 @@ under the License. - com.carrotsearch:hppc com.tdunning:t-digest joda-time:joda-time net.sf.jopt-simple:jopt-simple org.elasticsearch:jna - org.elasticsearch:elasticsearch-geo - org.elasticsearch.plugin:lang-mustache-client - com.github.spullara.mustache.java:compiler org.hdrhistogram:HdrHistogram org.yaml:snakeyaml @@ -136,13 +132,17 @@ under the License. org.apache.flink.elasticsearch7.shaded.org.elasticsearch - org.apache.logging - org.apache.flink.elasticsearch7.shaded.org.apache.logging - - com.fasterxml.jackson org.apache.flink.elasticsearch7.shaded.com.fasterxml.jackson + + com.carrotsearch.hppc + org.apache.flink.elasticsearch7.shaded.com.carrotsearch.hppc + + + com.github.mustachejava + org.apache.flink.elasticsearch7.shaded.com.github.mustachejava + diff --git a/flink-connectors/pom.xml b/flink-connectors/pom.xml index 1e92a10..296bf89 100644 --- a/flink-connectors/pom.xml +++ b/flink-connectors/pom.xml @@ -94,6 +94,7 @@ under the License.
[flink] branch release-1.8 updated (60d9b96 -> be4cb1a)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git. from 60d9b96 [FLINK-11193][State] Fix Rockdb timer service factory configuration option to be settable per job add be4cb1a [FLINK-16707] Fix potential memory leak of rest server when using session/standalone cluster for version 1.8 No new revisions were added by this update. Summary of changes: .../flink/runtime/rest/FileUploadHandler.java | 11 +++ .../flink/runtime/rest/FileUploadHandlerTest.java | 34 ++ 2 files changed, 45 insertions(+)
[flink-web] 02/02: Regenerate page
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 0980cd665712bbe6d28e8ecdaabe543855904a6c Author: Robert Metzger AuthorDate: Tue Mar 24 15:42:37 2020 +0100 Regenerate page this closes #315 --- content/blog/feed.xml | 209 +++-- content/blog/index.html| 8 +- content/blog/page10/index.html | 2 +- content/blog/page11/index.html | 2 +- content/blog/page2/index.html | 2 +- content/blog/page3/index.html | 2 +- content/blog/page4/index.html | 2 +- content/blog/page5/index.html | 2 +- content/blog/page6/index.html | 2 +- content/blog/page7/index.html | 2 +- content/blog/page8/index.html | 2 +- content/blog/page9/index.html | 2 +- content/index.html | 2 +- .../{02/26 => 03/24}/demo-fraud-detection-2.html | 2 +- content/zh/index.html | 2 +- 15 files changed, 212 insertions(+), 31 deletions(-) diff --git a/content/blog/feed.xml b/content/blog/feed.xml index 2ae389a..803c8fb 100644 --- a/content/blog/feed.xml +++ b/content/blog/feed.xml @@ -7,6 +7,201 @@ https://flink.apache.org/blog/feed.xml"; rel="self" type="application/rss+xml" /> +Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic +In the first article of the series, we gave a high-level description of the objectives and required functionality of a Fraud Detection engine. We also described how to make data partitioning in Apache Flink customizable based on modifiable rules instead of using a hardcoded
+ +KeysExtractor
implementation.We intentionally omitted details of how the applied rules are initialized and what possibilities exist for updating them at runtime. In this post, we will address exactly these details. You will learn how the approach to data partitioning described in Part 1 can be applied in combination with a dynamic configuration. These two patterns, when used together, can eliminate the nee [...] + +
Rules Broadcasting
+ +Let’s first have a look at the previously-defined; data-processing pipeline:
+ ++ +DataStream<Alert> alerts = +transactions +.process(new DynamicKeyFunction()) +.keyBy((keyed) -> keyed.getKey());< [...] +.process(new DynamicAlertFunction())
+ +
DynamicKeyFunction
provides dynamic data partitioning whileDynamicAlertFunction
is responsible for executing the main logic of processing transactions and sending alert messages according to defined rules.Vol.1 of this series simplified the use case and assumed that the applied set of rules is pre-initialized and accessible via the
+ +List<Rules>
withinDynamicKeyFunction
.public class DynamicKeyFunction +extends ProcessFunction<Transaction, Keyed<Transaction, /* Simplified */ + List<Rule> rules = /* Rules that are initialized somehow.*/
[flink-web] 01/02: [hotfix] fix patterns blog vol.2 date
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 7d746773f0a00ee11ff89d346b33ff15401589e9 Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Tue Mar 24 15:28:53 2020 +0100 [hotfix] fix patterns blog vol.2 date --- ...6-demo-fraud-detection-2.md => 2020-03-24-demo-fraud-detection-2.md} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_posts/2020-02-26-demo-fraud-detection-2.md b/_posts/2020-03-24-demo-fraud-detection-2.md similarity index 99% rename from _posts/2020-02-26-demo-fraud-detection-2.md rename to _posts/2020-03-24-demo-fraud-detection-2.md index c941b9f..8a2db4e 100644 --- a/_posts/2020-02-26-demo-fraud-detection-2.md +++ b/_posts/2020-03-24-demo-fraud-detection-2.md @@ -2,7 +2,7 @@ layout: post title: "Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic" -date: 2020-02-26T12:00:00.000Z +date: 2020-03-24T12:00:00.000Z authors: - alex: name: "Alexander Fedulov"
[flink-web] branch asf-site updated (5541234 -> 0980cd6)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 5541234 rebuild site new 7d74677 [hotfix] fix patterns blog vol.2 date new 0980cd6 Regenerate page The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: ...n-2.md => 2020-03-24-demo-fraud-detection-2.md} | 2 +- content/blog/feed.xml | 209 +++-- content/blog/index.html| 8 +- content/blog/page10/index.html | 2 +- content/blog/page11/index.html | 2 +- content/blog/page2/index.html | 2 +- content/blog/page3/index.html | 2 +- content/blog/page4/index.html | 2 +- content/blog/page5/index.html | 2 +- content/blog/page6/index.html | 2 +- content/blog/page7/index.html | 2 +- content/blog/page8/index.html | 2 +- content/blog/page9/index.html | 2 +- content/index.html | 2 +- .../{02/26 => 03/24}/demo-fraud-detection-2.html | 2 +- content/zh/index.html | 2 +- 16 files changed, 213 insertions(+), 32 deletions(-) rename _posts/{2020-02-26-demo-fraud-detection-2.md => 2020-03-24-demo-fraud-detection-2.md} (99%) rename content/news/2020/{02/26 => 03/24}/demo-fraud-detection-2.html (99%)
[flink] branch master updated (5c35d9d -> f6312b8)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5c35d9d [FLINK-15911][e2e] Add e2e test for Flink over NAT. add 3df0eb7 [FLINK-15647][k8s] Support user-specified annotations for the JM/TM pods add f6312b8 [FLINK-15647][k8s] Return an empty Map instead of null when annotations is not set No new revisions were added by this update. Summary of changes: .../generated/kubernetes_config_configuration.html | 12 +++ .../configuration/KubernetesConfigOptions.java | 15 + .../decorators/InitJobManagerDecorator.java| 1 + .../decorators/InitTaskManagerDecorator.java | 1 + .../factory/KubernetesJobManagerFactory.java | 4 +--- .../parameters/KubernetesJobManagerParameters.java | 6 ++ .../parameters/KubernetesParameters.java | 5 + .../KubernetesTaskManagerParameters.java | 7 ++ .../decorators/InitJobManagerDecoratorTest.java| 16 ++ .../decorators/InitTaskManagerDecoratorTest.java | 16 ++ .../KubernetesJobManagerParametersTest.java| 21 ++ .../KubernetesTaskManagerParametersTest.java | 25 +- 12 files changed, 125 insertions(+), 4 deletions(-)
[flink] branch master updated (b7a83ff -> 5c35d9d)
This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b7a83ff [FLINK-16732][hive] Failed to call Hive UDF with constant return value add c4aa570 [hotfix][runtime] Minor code clean-ups. add 61086f4 [hotfix][e2e] Deduplicate codes for building docker image with custom jar. add 03b6e76 [FLINK-15911][runtime] Refactor to create AkkaRpsService with builder class, in order to reduce the number of nested creating methods. add 8a1dbcc [FLINK-15911][runtime] Support configure address/port and bind-address/bind-port separately for JM/TM RPC services. add 3c99779 [FLINK-15911][runtime] Support configure address/port and bind-address/bind-port separately for netty shuffle service. add 9dc7a4a [FLINK-15911][runtime] Support unresolveable external hostname. add 5c35d9d [FLINK-15911][e2e] Add e2e test for Flink over NAT. No new revisions were added by this update. Summary of changes: .../generated/all_taskmanager_section.html | 6 +- .../generated/common_host_port_section.html| 6 +- .../generated/job_manager_configuration.html | 12 ++ .../netty_shuffle_environment_configuration.html | 8 +- .../generated/task_manager_configuration.html | 16 +- .../flink/configuration/JobManagerOptions.java | 20 ++ .../NettyShuffleEnvironmentOptions.java| 12 +- .../flink/configuration/TaskManagerOptions.java| 34 ++- flink-end-to-end-tests/run-nightly-tests.sh| 2 + .../test-scripts/common_docker.sh | 6 + .../test-scripts/common_kubernetes.sh | 1 + .../container-scripts/docker-compose.nat.yml | 90 .../test-scripts/test_docker_embedded_job.sh | 3 +- .../test-scripts/test_kubernetes_embedded_job.sh | 2 +- .../test-scripts/test_kubernetes_session.sh| 2 +- .../{test_docker_embedded_job.sh => test_nat.sh} | 50 ++--- .../queryablestate/network/AbstractServerBase.java | 7 +- .../client/proxy/KvStateClientProxyImpl.java | 5 +- .../queryablestate/server/KvStateServerImpl.java | 5 +- .../client/proxy/KvStateClientProxyImplTest.java | 2 +- .../queryablestate/network/AbstractServerTest.java | 2 +- .../flink/queryablestate/network/ClientTest.java | 2 +- .../network/KvStateServerHandlerTest.java | 4 +- .../queryablestate/network/KvStateServerTest.java | 2 +- .../runtime/clusterframework/BootstrapTools.java | 228 ++--- .../runtime/entrypoint/ClusterEntrypoint.java | 18 +- .../apache/flink/runtime/jobmaster/JobMaster.java | 15 +- .../flink/runtime/jobmaster/JobMasterGateway.java | 6 +- .../flink/runtime/metrics/util/MetricUtils.java| 10 +- .../flink/runtime/minicluster/MiniCluster.java | 127 .../minicluster/MiniClusterConfiguration.java | 29 ++- .../flink/runtime/query/QueryableStateUtils.java | 9 +- .../flink/runtime/rpc/akka/AkkaRpcService.java | 2 + .../runtime/rpc/akka/AkkaRpcServiceUtils.java | 204 -- .../runtime/taskexecutor/JobLeaderService.java | 14 +- .../flink/runtime/taskexecutor/KvStateService.java | 4 +- .../flink/runtime/taskexecutor/TaskExecutor.java | 12 +- .../runtime/taskexecutor/TaskManagerRunner.java| 19 +- .../runtime/taskexecutor/TaskManagerServices.java | 30 +-- .../TaskManagerServicesConfiguration.java | 44 +++- .../NettyShuffleEnvironmentConfiguration.java | 25 ++- .../runtime/taskmanager/TaskManagerLocation.java | 11 + .../UnresolvedTaskManagerLocation.java}| 36 ++-- .../org/apache/flink/runtime/akka/AkkaUtils.scala | 31 ++- .../clusterframework/BootstrapToolsTest.java | 4 +- .../jobmaster/JobMasterPartitionReleaseTest.java | 10 +- .../flink/runtime/jobmaster/JobMasterTest.java | 41 ++-- .../jobmaster/utils/TestingJobMasterGateway.java | 10 +- .../utils/TestingJobMasterGatewayBuilder.java | 6 +- .../AkkaRpcActorOversizedResponseMessageTest.java | 4 +- .../runtime/rpc/akka/MessageSerializationTest.java | 4 +- .../TaskExecutorLocalStateStoresManagerTest.java | 5 +- .../runtime/taskexecutor/JobLeaderServiceTest.java | 4 +- .../runtime/taskexecutor/TaskExecutorTest.java | 102 - .../taskexecutor/TaskManagerServicesBuilder.java | 16 +- ...ava => LocalUnresolvedTaskManagerLocation.java} | 13 +- .../JobManagerHAProcessFailureRecoveryITCase.java | 2 +- .../recovery/ProcessFailureCancelingITCase.java| 2 +- tools/travis/splits/split_container.sh | 1 + 59 files changed, 888 insertions(+), 509 deletions(-) create mode 100644 flink-end-to-end-tests/test-scripts/container-scripts/docker-compose.nat.yml copy flink-end-to-end-tests/test-scripts/{test_docker_embedded_job.sh => test_nat.sh} (62%) copy flink-runtime/src
[flink-web] branch asf-site updated (92ef003 -> 5541234)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 92ef003 rebuild website new d70cde9 Add Flink Patterns Blogpost Part 2 new 5541234 rebuild site The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: _posts/2020-02-26-demo-fraud-detection-2.md| 200 ++ content/blog/index.html| 36 +- content/blog/page10/index.html | 40 +- content/blog/page11/index.html | 25 ++ content/blog/page2/index.html | 36 +- content/blog/page3/index.html | 36 +- content/blog/page4/index.html | 38 +- content/blog/page5/index.html | 42 +- content/blog/page6/index.html | 42 +- content/blog/page7/index.html | 40 +- content/blog/page8/index.html | 40 +- content/blog/page9/index.html | 40 +- content/img/blog/patterns-blog-2/broadcast.png | Bin 0 -> 156450 bytes content/img/blog/patterns-blog-2/forward.png | Bin 0 -> 112715 bytes content/img/blog/patterns-blog-2/hash.png | Bin 0 -> 259743 bytes content/img/blog/patterns-blog-2/job-graph.png | Bin 0 -> 426837 bytes content/img/blog/patterns-blog-2/rebalance.png | Bin 0 -> 142683 bytes content/img/blog/patterns-blog-2/rule-dsl.png | Bin 0 -> 166981 bytes content/index.html | 8 +- .../news/2020/02/26/demo-fraud-detection-2.html| 426 + content/zh/index.html | 8 +- img/blog/patterns-blog-2/broadcast.png | Bin 0 -> 156450 bytes img/blog/patterns-blog-2/forward.png | Bin 0 -> 112715 bytes img/blog/patterns-blog-2/hash.png | Bin 0 -> 259743 bytes img/blog/patterns-blog-2/job-graph.png | Bin 0 -> 426837 bytes img/blog/patterns-blog-2/rebalance.png | Bin 0 -> 142683 bytes img/blog/patterns-blog-2/rule-dsl.png | Bin 0 -> 166981 bytes 27 files changed, 901 insertions(+), 156 deletions(-) create mode 100644 _posts/2020-02-26-demo-fraud-detection-2.md create mode 100644 content/img/blog/patterns-blog-2/broadcast.png create mode 100644 content/img/blog/patterns-blog-2/forward.png create mode 100644 content/img/blog/patterns-blog-2/hash.png create mode 100644 content/img/blog/patterns-blog-2/job-graph.png create mode 100644 content/img/blog/patterns-blog-2/rebalance.png create mode 100644 content/img/blog/patterns-blog-2/rule-dsl.png create mode 100644 content/news/2020/02/26/demo-fraud-detection-2.html create mode 100644 img/blog/patterns-blog-2/broadcast.png create mode 100644 img/blog/patterns-blog-2/forward.png create mode 100644 img/blog/patterns-blog-2/hash.png create mode 100644 img/blog/patterns-blog-2/job-graph.png create mode 100644 img/blog/patterns-blog-2/rebalance.png create mode 100644 img/blog/patterns-blog-2/rule-dsl.png
[flink-web] 01/02: Add Flink Patterns Blogpost Part 2
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit d70cde95e7c6a13b3ae2bf22c3ba6d48f55321fc Author: Alexander Fedulov <1492164+afedu...@users.noreply.github.com> AuthorDate: Thu Feb 27 15:12:50 2020 +0100 Add Flink Patterns Blogpost Part 2 This closes #310 --- _posts/2020-02-26-demo-fraud-detection-2.md | 200 content/blog/index.html | 36 +++-- content/blog/page10/index.html | 40 +++--- content/blog/page11/index.html | 25 content/blog/page2/index.html | 36 +++-- content/blog/page3/index.html | 36 +++-- content/blog/page4/index.html | 38 +++--- content/blog/page5/index.html | 42 +++--- content/blog/page6/index.html | 42 +++--- content/blog/page7/index.html | 40 +++--- content/blog/page8/index.html | 40 +++--- content/blog/page9/index.html | 40 +++--- content/zh/index.html | 8 +- img/blog/patterns-blog-2/broadcast.png | Bin 0 -> 156450 bytes img/blog/patterns-blog-2/forward.png| Bin 0 -> 112715 bytes img/blog/patterns-blog-2/hash.png | Bin 0 -> 259743 bytes img/blog/patterns-blog-2/job-graph.png | Bin 0 -> 426837 bytes img/blog/patterns-blog-2/rebalance.png | Bin 0 -> 142683 bytes img/blog/patterns-blog-2/rule-dsl.png | Bin 0 -> 166981 bytes 19 files changed, 472 insertions(+), 151 deletions(-) diff --git a/_posts/2020-02-26-demo-fraud-detection-2.md b/_posts/2020-02-26-demo-fraud-detection-2.md new file mode 100644 index 000..c941b9f --- /dev/null +++ b/_posts/2020-02-26-demo-fraud-detection-2.md @@ -0,0 +1,200 @@ +--- +layout: post +title: "Advanced Flink Application Patterns Vol.2: +Dynamic Updates of Application Logic" +date: 2020-02-26T12:00:00.000Z +authors: +- alex: + name: "Alexander Fedulov" + twitter: "alex_fedulov" +categories: news +excerpt: In this series of blog posts you will learn about powerful Flink patterns for building streaming applications. +--- + +In the [first article](https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html) of the series, we gave a high-level description of the objectives and required functionality of a Fraud Detection engine. We also described how to make data partitioning in Apache Flink customizable based on modifiable rules instead of using a hardcoded `KeysExtractor` implementation. + +We intentionally omitted details of how the applied rules are initialized and what possibilities exist for updating them at runtime. In this post, we will address exactly these details. You will learn how the approach to data partitioning described in [Part 1](https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html) can be applied in combination with a dynamic configuration. These two patterns, when used together, can eliminate the need to recompile the code and redeploy your [...] + +## Rules Broadcasting + +Let's first have a look at the [previously-defined](https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html#dynamic-data-partitioning) data-processing pipeline: + +```java +DataStream alerts = +transactions +.process(new DynamicKeyFunction()) +.keyBy((keyed) -> keyed.getKey()); +.process(new DynamicAlertFunction()) +``` + +`DynamicKeyFunction` provides dynamic data partitioning while `DynamicAlertFunction` is responsible for executing the main logic of processing transactions and sending alert messages according to defined rules. + +Vol.1 of this series simplified the use case and assumed that the applied set of rules is pre-initialized and accessible via the `List` within `DynamicKeyFunction`. + +```java +public class DynamicKeyFunction +extends ProcessFunction> { + + /* Simplified */ + List rules = /* Rules that are initialized somehow.*/; + ... +} +``` + +Adding rules to this list is obviously possible directly inside the code of the Flink Job at the stage of its initialization (Create a `List` object; use it's `add` method). A major drawback of doing so is that it will require recompilation of the job with each rule modification. In a real Fraud Detection system, rules are expected to change on a frequent basis, making this approach unacceptable from the point of view of business and operational requirements. A different approach is needed. + +Next, let's take a look at a sample rule definition that we introduced in the previous post of the series: + + + + +Figure 1: Rule definition + + + +The previous post covered use of `groupingKeyNames` by `DynamicKeyFunction` to extract message keys. Parameters from the second part of this rule are used by `DynamicAlertFunction`: they define the actual logic of the performed operations and their parameter
[flink-web] 02/02: rebuild site
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git commit 5541234e60c8bc40e1fa72c2307e8f35340e227b Author: Robert Metzger AuthorDate: Tue Mar 24 15:11:35 2020 +0100 rebuild site --- content/blog/index.html| 40 +- content/img/blog/patterns-blog-2/broadcast.png | Bin 0 -> 156450 bytes content/img/blog/patterns-blog-2/forward.png | Bin 0 -> 112715 bytes content/img/blog/patterns-blog-2/hash.png | Bin 0 -> 259743 bytes content/img/blog/patterns-blog-2/job-graph.png | Bin 0 -> 426837 bytes content/img/blog/patterns-blog-2/rebalance.png | Bin 0 -> 142683 bytes content/img/blog/patterns-blog-2/rule-dsl.png | Bin 0 -> 166981 bytes content/index.html | 8 +- .../news/2020/02/26/demo-fraud-detection-2.html| 426 + content/zh/index.html | 2 +- 10 files changed, 450 insertions(+), 26 deletions(-) diff --git a/content/blog/index.html b/content/blog/index.html index 390d053..83f8232 100644 --- a/content/blog/index.html +++ b/content/blog/index.html @@ -185,6 +185,19 @@ + Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic + + 26 Feb 2020 + Alexander Fedulov (https://twitter.com/alex_fedulov";>@alex_fedulov) + + In this series of blog posts you will learn about powerful Flink patterns for building streaming applications. + + Continue reading » + + + + + Apache Beam: How Beam Runs on Top of Flink 22 Feb 2020 @@ -267,19 +280,6 @@ - Advanced Flink Application Patterns Vol.2: Case Study of a Fraud Detection System - - 15 Jan 2020 - Alexander Fedulov (https://twitter.com/alex_fedulov";>@alex_fedulov) - - In this series of blog posts you will learn about powerful Flink patterns for building streaming applications. - - Continue reading » - - - - - Advanced Flink Application Patterns Vol.1: Case Study of a Fraud Detection System 15 Jan 2020 @@ -352,7 +352,7 @@ - Apache Beam: How Beam Runs on Top of Flink + Advanced Flink Application Patterns Vol.2: Dynamic Updates of Application Logic @@ -362,7 +362,7 @@ - No Java Required: Configuring Sources and Sinks in SQL + Apache Beam: How Beam Runs on Top of Flink @@ -372,7 +372,7 @@ - Apache Flink 1.10.0 Release Announcement + No Java Required: Configuring Sources and Sinks in SQL @@ -382,7 +382,7 @@ - A Guide for Unit Testing in Apache Flink + Apache Flink 1.10.0 Release Announcement @@ -392,7 +392,7 @@ - Apache Flink 1.9.2 Released + A Guide for Unit Testing in Apache Flink @@ -402,7 +402,7 @@ - State Unlocked: Interacting with State in Apache Flink + Apache Flink 1.9.2 Released @@ -412,7 +412,7 @@ - Advanced Flink Application Patterns Vol.2: Case Study of a Fraud Detection System + State Unlocked: Interacting with State in Apache Flink diff --git a/content/img/blog/patterns-blog-2/broadcast.png b/content/img/blog/patterns-blog-2/broadcast.png new file mode 100644 index 000..7ed8c47 Binary files /dev/null and b/content/img/blog/patterns-blog-2/broadcast.png differ diff --git a/content/img/blog/patterns-blog-2/forward.png b/content/img/blog/patterns-blog-2/forward.png new file mode 100644 index 000..343899d Binary files /dev/null and b/content/img/blog/patterns-blog-2/forward.png differ diff --git a/content/img/blog/patterns-blog-2/hash.png b/content/img/blog/patterns-blog-2/hash.png new file mode 100644 index 000..d586293 Binary files /dev/null and b/content/img/blog/patterns-blog-2/hash.png differ diff --git a/content/img/blog/patterns-blog-2/job-graph.png b/content/img/blog/patterns-blog-2/job-graph.png new file mode 100644 index 000..f97c1a2 Binary files /dev/null and b/content/img/blog/patterns-blog-2/job-graph.png differ diff --git a/content/img/blog/patterns-blog-2/rebalance.png b/content/img/blog/patterns-blog-2/rebalance.png new file mode 100644 index 000..61d5797 Binary files /dev/null and b/content/img/blog/patterns-blog-2/rebalance.png differ diff --git a/content/img/blog/patterns-blog-2/rule-dsl.png b/content/img/blog/patterns-blog-2/rule-dsl.png new file mode 100644 index 000..ce1b547 Binary files /dev/null and b/content/img/blog/patterns-blog-2/rule-dsl.png differ diff --git a/content/index.html b/content/index.html index cf1c79d..99c2cdb 100644 --- a/content/index.html +++ b/content/index.html @@ -557,6 +557,9 @@
[flink] 02/02: [FLINK-16732][hive] Failed to call Hive UDF with constant return value
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit f9f63a4c11cf13f56ad06d9da6231d711477f28e Author: Rui Li AuthorDate: Tue Mar 24 20:40:03 2020 +0800 [FLINK-16732][hive] Failed to call Hive UDF with constant return value This closes #11494 --- .../apache/flink/table/functions/hive/HiveGenericUDF.java| 6 +- .../org/apache/flink/table/module/hive/HiveModuleTest.java | 12 2 files changed, 17 insertions(+), 1 deletion(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java index 9adf5ff..1c3649a 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/functions/hive/HiveGenericUDF.java @@ -27,6 +27,7 @@ import org.apache.flink.table.types.DataType; import org.apache.hadoop.hive.ql.exec.UDFArgumentException; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.udf.generic.GenericUDF; +import org.apache.hadoop.hive.serde2.objectinspector.ConstantObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.slf4j.Logger; @@ -83,7 +84,10 @@ public class HiveGenericUDF extends HiveScalarFunction { } try { - return HiveInspectors.toFlinkObject(returnInspector, function.evaluate(deferredObjects), hiveShim); + Object result = returnInspector instanceof ConstantObjectInspector ? + ((ConstantObjectInspector) returnInspector).getWritableConstantValue() : + function.evaluate(deferredObjects); + return HiveInspectors.toFlinkObject(returnInspector, result, hiveShim); } catch (HiveException e) { throw new FlinkHiveUDFException(e); } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java index 4acc616..04cf165 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/module/hive/HiveModuleTest.java @@ -149,4 +149,16 @@ public class HiveModuleTest { assertFalse(hiveModule.getFunctionDefinition(banned).isPresent()); } } + + @Test + public void testConstantReturnValue() throws Exception { + TableEnvironment tableEnv = HiveTestUtils.createTableEnvWithBlinkPlannerBatchMode(); + + tableEnv.unloadModule("core"); + tableEnv.loadModule("hive", new HiveModule()); + + List results = TableUtils.collectToList(tableEnv.sqlQuery("select str_to_map('a:1,b:2,c:3',',',':')")); + + assertEquals("[{a=1, b=2, c=3}]", results.toString()); + } }
[flink] branch release-1.10 updated (3877924 -> f9f63a4)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 3877924 [FLINK-15989] Add JVM Direct Error assumption test new 123943c [FLINK-16740][orc] Orc logicalTypeToOrcType fails to create decimal type with precision < 10 new f9f63a4 [FLINK-16732][hive] Failed to call Hive UDF with constant return value The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/flink/table/functions/hive/HiveGenericUDF.java| 6 +- .../org/apache/flink/table/module/hive/HiveModuleTest.java | 12 .../main/java/org/apache/flink/orc/OrcSplitReaderUtil.java | 4 ++-- .../java/org/apache/flink/orc/OrcSplitReaderUtilTest.java| 1 + 4 files changed, 20 insertions(+), 3 deletions(-)
[flink] 01/02: [FLINK-16740][orc] Orc logicalTypeToOrcType fails to create decimal type with precision < 10
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 123943c819a0408941f8d437bad46a7f68f00d5e Author: Rui Li AuthorDate: Tue Mar 24 20:38:39 2020 +0800 [FLINK-16740][orc] Orc logicalTypeToOrcType fails to create decimal type with precision < 10 This closes #11492 --- .../src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java| 4 ++-- .../src/test/java/org/apache/flink/orc/OrcSplitReaderUtilTest.java| 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java index e23bd72..e55601e 100644 --- a/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java +++ b/flink-formats/flink-orc/src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java @@ -147,8 +147,8 @@ public class OrcSplitReaderUtil { case DECIMAL: DecimalType decimalType = (DecimalType) type; return TypeDescription.createDecimal() - .withPrecision(decimalType.getPrecision()) - .withScale(decimalType.getScale()); + .withScale(decimalType.getScale()) + .withPrecision(decimalType.getPrecision()); case TINYINT: return TypeDescription.createByte(); case SMALLINT: diff --git a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcSplitReaderUtilTest.java b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcSplitReaderUtilTest.java index 7b84f1e..7f05a8e 100644 --- a/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcSplitReaderUtilTest.java +++ b/flink-formats/flink-orc/src/test/java/org/apache/flink/orc/OrcSplitReaderUtilTest.java @@ -59,6 +59,7 @@ public class OrcSplitReaderUtilTest { DataTypes.FIELD("int0", DataTypes.INT()), DataTypes.FIELD("int1", DataTypes.INT())) ))); + test("decimal(4,2)", DataTypes.DECIMAL(4, 2)); } private void test(String expected, DataType type) {
[flink] branch master updated (d6a0e4a -> b7a83ff)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d6a0e4a [FLINK-16740][orc] Orc logicalTypeToOrcType fails to create decimal type with precision < 10 add b7a83ff [FLINK-16732][hive] Failed to call Hive UDF with constant return value No new revisions were added by this update. Summary of changes: .../apache/flink/table/functions/hive/HiveGenericUDF.java| 6 +- .../org/apache/flink/table/module/hive/HiveModuleTest.java | 12 2 files changed, 17 insertions(+), 1 deletion(-)
[flink] branch master updated (f22756c -> d6a0e4a)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from f22756c [FLINK-15989] Add JVM Direct Error assumption test add d6a0e4a [FLINK-16740][orc] Orc logicalTypeToOrcType fails to create decimal type with precision < 10 No new revisions were added by this update. Summary of changes: .../src/main/java/org/apache/flink/orc/OrcSplitReaderUtil.java| 4 ++-- .../src/test/java/org/apache/flink/orc/OrcSplitReaderUtilTest.java| 1 + 2 files changed, 3 insertions(+), 2 deletions(-)
[flink-web] branch asf-site updated: rebuild website
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git The following commit(s) were added to refs/heads/asf-site by this push: new 92ef003 rebuild website 92ef003 is described below commit 92ef003822de09eece213ee861f60954472f5b3b Author: Robert Metzger AuthorDate: Tue Mar 24 10:03:38 2020 +0100 rebuild website --- content/2019/05/03/pulsar-flink.html | 2 - content/2019/05/14/temporal-tables.html| 2 - content/2019/05/19/state-ttl.html | 2 - content/2019/06/05/flink-network-stack.html| 2 - content/2019/06/26/broadcast-state.html| 2 - content/2019/07/23/flink-network-stack-2.html | 2 - content/blog/index.html| 2 - content/blog/page10/index.html | 2 - content/blog/page11/index.html | 2 - content/blog/page2/index.html | 2 - content/blog/page3/index.html | 2 - content/blog/page4/index.html | 2 - content/blog/page5/index.html | 2 - content/blog/page6/index.html | 2 - content/blog/page7/index.html | 2 - content/blog/page8/index.html | 2 - content/blog/page9/index.html | 2 - .../blog/release_1.0.0-changelog_known_issues.html | 2 - content/blog/release_1.1.0-changelog.html | 2 - content/blog/release_1.2.0-changelog.html | 2 - content/blog/release_1.3.0-changelog.html | 2 - content/community.html | 2 - .../code-style-and-quality-common.html | 2 - .../code-style-and-quality-components.html | 2 - .../code-style-and-quality-formatting.html | 2 - .../contributing/code-style-and-quality-java.html | 2 - .../code-style-and-quality-preamble.html | 2 - .../code-style-and-quality-pull-requests.html | 2 - .../contributing/code-style-and-quality-scala.html | 2 - content/contributing/contribute-code.html | 2 - content/contributing/contribute-documentation.html | 2 - content/contributing/docs-style.html | 2 - content/contributing/how-to-contribute.html| 18 -- content/contributing/improve-website.html | 2 - content/contributing/reviewing-prs.html| 2 - content/documentation.html | 2 - content/downloads.html | 2 - content/ecosystem.html | 2 - .../apache-beam-how-beam-runs-on-top-of-flink.html | 2 - content/faq.html | 324 - .../feature/2019/09/13/state-processor-api.html| 2 - .../2017/07/04/flink-rescalable-state.html | 2 - .../2018/01/30/incremental-checkpointing.html | 2 - .../01/end-to-end-exactly-once-apache-flink.html | 2 - .../features/2019/03/11/prometheus-monitoring.html | 2 - content/flink-applications.html| 2 - content/flink-architecture.html| 2 - content/flink-operations.html | 2 - content/gettinghelp.html | 2 - content/index.html | 2 - content/material.html | 2 - content/news/2014/08/26/release-0.6.html | 2 - content/news/2014/09/26/release-0.6.1.html | 2 - content/news/2014/10/03/upcoming_events.html | 2 - content/news/2014/11/04/release-0.7.0.html | 2 - content/news/2014/11/18/hadoop-compatibility.html | 2 - content/news/2015/01/06/december-in-flink.html | 2 - content/news/2015/01/21/release-0.8.html | 2 - content/news/2015/02/04/january-in-flink.html | 2 - content/news/2015/02/09/streaming-example.html | 2 - .../news/2015/03/02/february-2015-in-flink.html| 2 - .../13/peeking-into-Apache-Flinks-Engine-Room.html | 2 - content/news/2015/04/07/march-in-flink.html| 2 - .../news/2015/04/13/release-0.9.0-milestone1.html | 2 - .../2015/05/11/Juggling-with-Bits-and-Bytes.html | 2 - .../news/2015/05/14/Community-update-April.html| 2 - .../24/announcing-apache-flink-0.9.0-release.html | 2 - .../news/2015/08/24/introducing-flink-gelly.html | 2 - content/news/2015/09/01/release-0.9.1.html | 2 - content/news/2015/09/03/flink-forward.html | 2 - content/news/2015/09/16/off-heap-memory.html | 2 - content/news/2015/11/16/release-0.10.0.html| 2 - content/news/2015/11/27/release-0.10.1.html| 2 - content/news/2015/12/04/Introducing-windows.html | 2 - content/news/2015/12/11/storm-compatibility.html | 2 - content/news/2015/12/18/
[flink-web] branch asf-site updated (60b7fab -> aa1aab0)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git. from 60b7fab [hotfix] Remove mention of icla requirement on how to contrib guide add aa1aab0 [FLINK-16407] Remove FAQ No new revisions were added by this update. Summary of changes: _data/i18n.yml| 2 -- _includes/navbar.html | 2 -- faq.md| 90 --- faq.zh.md | 79 4 files changed, 173 deletions(-) delete mode 100644 faq.md delete mode 100644 faq.zh.md
[flink] 02/04: [FLINK-16225] Add JVM Metaspace Error assumption test
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit cdf4d6433ee7433553d7b51018838e508c7c Author: Andrey Zagrebin AuthorDate: Thu Mar 19 11:46:46 2020 +0300 [FLINK-16225] Add JVM Metaspace Error assumption test --- .../apache/flink/test/util/TestProcessBuilder.java | 21 +++- .../flink/runtime/util/ExceptionUtilsITCases.java | 121 + ...tractTaskManagerProcessFailureRecoveryTest.java | 2 +- .../recovery/ProcessFailureCancelingITCase.java| 4 +- 4 files changed, 141 insertions(+), 7 deletions(-) diff --git a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java index aad7845..d479956 100644 --- a/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java +++ b/flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/test/util/TestProcessBuilder.java @@ -70,10 +70,12 @@ public class TestProcessBuilder { commands.addAll(mainClassArgs); StringWriter processOutput = new StringWriter(); + StringWriter errorOutput = new StringWriter(); Process process = new ProcessBuilder(commands).start(); - new PipeForwarder(process.getErrorStream(), processOutput); + new PipeForwarder(process.getInputStream(), processOutput); + new PipeForwarder(process.getErrorStream(), errorOutput); - return new TestProcess(process, processOutput); + return new TestProcess(process, processOutput, errorOutput); } public TestProcessBuilder setJvmMemory(MemorySize jvmMemory) { @@ -81,6 +83,11 @@ public class TestProcessBuilder { return this; } + public TestProcessBuilder addJvmArg(String arg) { + jvmArgs.add(arg); + return this; + } + public TestProcessBuilder addMainClassArg(String arg) { mainClassArgs.add(arg); return this; @@ -100,20 +107,26 @@ public class TestProcessBuilder { public static class TestProcess { private final Process process; private final StringWriter processOutput; + private final StringWriter errorOutput; - public TestProcess(Process process, StringWriter processOutput) { + public TestProcess(Process process, StringWriter processOutput, StringWriter errorOutput) { this.process = process; this.processOutput = processOutput; + this.errorOutput = errorOutput; } public Process getProcess() { return process; } - public StringWriter getOutput() { + public StringWriter getProcessOutput() { return processOutput; } + public StringWriter getErrorOutput() { + return errorOutput; + } + public void destroy() { process.destroy(); } diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java new file mode 100644 index 000..045babb --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.test.util.TestProcessBuilder; +import org.apache.flink.test.util.TestProcessBuilder.TestProcess; +import org.apache.flink.testutils.ClassLoaderUtils; +import org.apache.flink.testutils.ClassLoaderUtils.ClassLoaderBuilder; +import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +i
[flink] 01/04: [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 2c934ff2a42fe256c2a1174788cbe55c05e8e323 Author: Andrey Zagrebin AuthorDate: Sun Mar 15 14:26:05 2020 +0300 [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code Improve error message, explaining the possible reasons and ways to resolve. In case of metaspace OOM error, try a graceful TM shutdown. This closes #11408. --- .../java/org/apache/flink/util/ExceptionUtils.java | 52 ++ .../runtime/taskexecutor/TaskManagerRunner.java| 8 +++- .../org/apache/flink/runtime/taskmanager/Task.java | 2 + 3 files changed, 60 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index ddd0276..5fc1bfe 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -25,6 +25,7 @@ package org.apache.flink.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.util.function.RunnableWithException; import javax.annotation.Nullable; @@ -48,6 +49,14 @@ public final class ExceptionUtils { /** The stringified representation of a null exception reference. */ public static final String STRINGIFIED_NULL_EXCEPTION = "(null)"; + private static final String TM_METASPACE_OOM_ERROR_MESSAGE = String.format( + "Metaspace. The metaspace out-of-memory error has occurred. This can mean two things: either the job requires " + + "a larger size of JVM metaspace to load classes or there is a class loading leak. In the first case " + + "'%s' configuration option should be increased. If the error persists (usually in cluster after " + + "several job (re-)submissions) then there is probably a class loading leak which has to be " + + "investigated and fixed. The task executor has to be shutdown...", + TaskManagerOptions.JVM_METASPACE.key()); + /** * Makes a string representation of the exception's stack trace, or "(null)", if the * exception is null. @@ -110,6 +119,49 @@ public final class ExceptionUtils { } /** +* Generates new {@link OutOfMemoryError} with more detailed message. +* +* This method improves error message for metaspace {@link OutOfMemoryError}. +* It adds description of possible causes and ways of resolution. +* +* @param exception The exception to enrich. +* @return either enriched exception if needed or the original one. +*/ + public static Throwable enrichTaskManagerOutOfMemoryError(Throwable exception) { + if (isMetaspaceOutOfMemoryError(exception)) { + return changeOutOfMemoryErrorMessage(exception, TM_METASPACE_OOM_ERROR_MESSAGE); + } + return exception; + } + + private static OutOfMemoryError changeOutOfMemoryErrorMessage(Throwable exception, String newMessage) { + Preconditions.checkArgument(exception instanceof OutOfMemoryError); + if (exception.getMessage().equals(newMessage)) { + return (OutOfMemoryError) exception; + } + OutOfMemoryError newError = new OutOfMemoryError(newMessage); + newError.initCause(exception.getCause()); + newError.setStackTrace(exception.getStackTrace()); + return newError; + } + + /** +* Checks whether the given exception indicates a JVM metaspace out-of-memory error. +* +* @param t The exception to check. +* @return True, if the exception is the metaspace {@link OutOfMemoryError}, false otherwise. +*/ + public static boolean isMetaspaceOutOfMemoryError(Throwable t) { + return isOutOfMemoryErrorWithMessageStartingWith(t, "Metaspace"); + } + + private static boolean isOutOfMemoryErrorWithMessageStartingWith(Throwable t, String prefix) { + // the exact matching of the class is checked to avoid matching any custom subclasses of OutOfMemoryError + // as we are interested in the original exceptions, generated by JVM. + return t.getClass() == OutOfMemoryError.class && t.getMessage() != null && t.getMessage().startsWith(prefix); + } + + /** * Rethrows the given {@code Throwable}, if it represents an error that is fatal to the JVM. * See {@link ExceptionUtils#isJvmFatalError(Throwable)} for a definition of fatal errors.
[flink] 03/04: [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 25ec89b8dae175246fd28972f8dbf7d479e89b5f Author: Andrey Zagrebin AuthorDate: Thu Mar 19 16:16:58 2020 +0300 [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory --- .../flink/core/memory/MemorySegmentFactory.java| 25 +++- .../java/org/apache/flink/util/ExceptionUtils.java | 27 +- 2 files changed, 50 insertions(+), 2 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index c297a26..760d2ac 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -19,6 +19,10 @@ package org.apache.flink.core.memory; import org.apache.flink.annotation.Internal; +import org.apache.flink.util.ExceptionUtils; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; @@ -32,6 +36,7 @@ import java.nio.ByteBuffer; */ @Internal public final class MemorySegmentFactory { + private static final Logger LOG = LoggerFactory.getLogger(MemorySegmentFactory.class); /** * Creates a new memory segment that targets the given heap memory region. @@ -94,10 +99,28 @@ public final class MemorySegmentFactory { * @return A new memory segment, backed by unpooled off-heap memory. */ public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) { - ByteBuffer memory = ByteBuffer.allocateDirect(size); + ByteBuffer memory = allocateDirectMemory(size); return new HybridMemorySegment(memory, owner, null); } + private static ByteBuffer allocateDirectMemory(int size) { + //noinspection ErrorNotRethrown + try { + return ByteBuffer.allocateDirect(size); + } catch (OutOfMemoryError outOfMemoryError) { + // TODO: this error handling can be removed in future, + // once we find a common way to handle OOM errors in netty threads. + // Here we enrich it to propagate better OOM message to the receiver + // if it happens in a netty thread. + OutOfMemoryError enrichedOutOfMemoryError = (OutOfMemoryError) ExceptionUtils + .enrichTaskManagerOutOfMemoryError(outOfMemoryError); + if (ExceptionUtils.isDirectOutOfMemoryError(outOfMemoryError)) { + LOG.error("Cannot allocate direct memory segment", enrichedOutOfMemoryError); + } + throw enrichedOutOfMemoryError; + } + } + /** * Allocates an off-heap unsafe memory and creates a new memory segment to represent that memory. * diff --git a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java index 5fc1bfe..0a63ae5 100644 --- a/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/ExceptionUtils.java @@ -49,6 +49,19 @@ public final class ExceptionUtils { /** The stringified representation of a null exception reference. */ public static final String STRINGIFIED_NULL_EXCEPTION = "(null)"; + private static final String TM_DIRECT_OOM_ERROR_MESSAGE = String.format( + "Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) " + + "a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be " + + "allocated by user code or some of its dependencies. In this case '%s' configuration option should be " + + "increased. Flink framework and its dependencies also consume the direct memory, mostly for network " + + "communication. The most of network memory is managed by Flink and should not result in out-of-memory " + + "error. In certain special cases, in particular for jobs with high parallelism, the framework may " + + "require more direct memory which is not managed by Flink. In this case '%s' configuration option " + + "should be increased. If the error persists then there is probably a direct memory leak which has to " + + "be investigated and fixed. The task executor has to be shutdown...", + TaskManagerOptions.TASK_OFF_HE
[flink] branch release-1.10 updated (6c5621e -> 3877924)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git. from 6c5621e [FLINK-16567][table][doc] Update to TableConfig in query_configuration document new 2c934ff [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code new cdf4d64 [FLINK-16225] Add JVM Metaspace Error assumption test new 25ec89b [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory new 3877924 [FLINK-15989] Add JVM Direct Error assumption test The 4 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../flink/core/memory/MemorySegmentFactory.java| 25 ++- .../java/org/apache/flink/util/ExceptionUtils.java | 77 + .../runtime/taskexecutor/TaskManagerRunner.java| 8 +- .../org/apache/flink/runtime/taskmanager/Task.java | 2 + .../apache/flink/test/util/TestProcessBuilder.java | 21 ++- .../flink/runtime/util/ExceptionUtilsITCases.java | 175 + ...tractTaskManagerProcessFailureRecoveryTest.java | 2 +- .../recovery/ProcessFailureCancelingITCase.java| 4 +- 8 files changed, 304 insertions(+), 10 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java
[flink] 04/04: [FLINK-15989] Add JVM Direct Error assumption test
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git commit 387792408b95572f3de9be3b56775e33790822e8 Author: Andrey Zagrebin AuthorDate: Thu Mar 19 12:33:57 2020 +0300 [FLINK-15989] Add JVM Direct Error assumption test --- .../flink/runtime/util/ExceptionUtilsITCases.java | 78 ++ 1 file changed, 66 insertions(+), 12 deletions(-) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java index 045babb..fd42f59 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java @@ -33,8 +33,11 @@ import java.io.File; import java.io.IOException; import java.lang.management.ManagementFactory; import java.lang.management.MemoryPoolMXBean; +import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import static org.hamcrest.CoreMatchers.is; import static org.junit.Assert.assertThat; @@ -43,31 +46,82 @@ import static org.junit.Assert.assertThat; * Tests for {@link ExceptionUtils} which require to spawn JVM process and set JVM memory args. */ public class ExceptionUtilsITCases extends TestLogger { - private static final long INITIAL_BIG_METASPACE_SIZE = 32 * (1 << 20); // 32Mb + private static final int DIRECT_MEMORY_SIZE = 10 * 1024; // 10Kb + private static final int DIRECT_MEMORY_ALLOCATION_PAGE_SIZE = 1024; // 1Kb + private static final int DIRECT_MEMORY_PAGE_NUMBER = DIRECT_MEMORY_SIZE / DIRECT_MEMORY_ALLOCATION_PAGE_SIZE; + private static final long INITIAL_BIG_METASPACE_SIZE = 128 * (1 << 20); // 128Mb @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); @Test + public void testIsDirectOutOfMemoryError() throws IOException, InterruptedException { + String className = DummyDirectAllocatingProgram.class.getName(); + String out = run(className, Collections.emptyList(), DIRECT_MEMORY_SIZE, -1); + assertThat(out, is("")); + } + + @Test public void testIsMetaspaceOutOfMemoryError() throws IOException, InterruptedException { + String className = DummyClassLoadingProgram.class.getName(); // load only one class and record required Metaspace - long okMetaspace = Long.parseLong(run(1, INITIAL_BIG_METASPACE_SIZE)); + String normalOut = run(className, getDummyClassLoadingProgramArgs(1), -1, INITIAL_BIG_METASPACE_SIZE); + long okMetaspace = Long.parseLong(normalOut); // load more classes to cause 'OutOfMemoryError: Metaspace' - assertThat(run(1000, okMetaspace), is("")); + String oomOut = run(className, getDummyClassLoadingProgramArgs(1000), -1, okMetaspace); + assertThat(oomOut, is("")); } - private static String run(int numberOfLoadedClasses, long metaspaceSize) throws InterruptedException, IOException { - TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(DummyClassLoadingProgram.class.getName()); - taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops"); - taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize)); - taskManagerProcessBuilder.addMainClassArg(Integer.toString(numberOfLoadedClasses)); - taskManagerProcessBuilder.addMainClassArg(TEMPORARY_FOLDER.getRoot().getAbsolutePath()); + private static String run( + String className, + Iterable args, + long directMemorySize, + long metaspaceSize) throws InterruptedException, IOException { + TestProcessBuilder taskManagerProcessBuilder = new TestProcessBuilder(className); + if (directMemorySize > 0) { + taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxDirectMemorySize=%d", directMemorySize)); + } + if (metaspaceSize > 0) { + taskManagerProcessBuilder.addJvmArg("-XX:-UseCompressedOops"); + taskManagerProcessBuilder.addJvmArg(String.format("-XX:MaxMetaspaceSize=%d", metaspaceSize)); + } + for (String arg : args) { + taskManagerProcessBuilder.addMainClassArg(arg); + } TestProcess p = taskManagerProcessBuilder.start(); p.getProcess().waitFor(); assertThat(p.getErrorOutput().toString().trim(), is(
[flink] branch master updated (28bcbd0 -> f22756c)
This is an automated email from the ASF dual-hosted git repository. azagrebin pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 28bcbd0 [FLINK-16608][python] Support TimeType in vectorized Python UDF add 813f590 [FLINK-16225] Improve metaspace out-of-memory error handling thrown in user code add 874a4af [FLINK-16225] Add JVM Metaspace Error assumption test add a752184 [FLINK-15989] Improve direct out-of-memory error handling in MemorySegmentFactory add f22756c [FLINK-15989] Add JVM Direct Error assumption test No new revisions were added by this update. Summary of changes: .../flink/core/memory/MemorySegmentFactory.java| 25 ++- .../java/org/apache/flink/util/ExceptionUtils.java | 77 + .../runtime/taskexecutor/TaskManagerRunner.java| 8 +- .../org/apache/flink/runtime/taskmanager/Task.java | 2 + .../apache/flink/test/util/TestProcessBuilder.java | 21 ++- .../flink/runtime/util/ExceptionUtilsITCases.java | 175 + ...tractTaskManagerProcessFailureRecoveryTest.java | 2 +- .../recovery/ProcessFailureCancelingITCase.java| 4 +- 8 files changed, 304 insertions(+), 10 deletions(-) create mode 100644 flink-tests/src/test/java/org/apache/flink/runtime/util/ExceptionUtilsITCases.java