[GitHub] twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#discussion_r221513851 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ## @@ -96,6 +96,6 @@ class StreamTableDescriptor( // this performs only basic validation // more validation can only happen within a factory -new StreamTableDescriptorValidator().validate(properties) +new StreamTableDescriptorValidator(true, true, true).validate(properties) Review comment: I think we can also remove it. As the comment suggests it doesn't check much. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#discussion_r221513851 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ## @@ -96,6 +96,6 @@ class StreamTableDescriptor( // this performs only basic validation // more validation can only happen within a factory -new StreamTableDescriptorValidator().validate(properties) +new StreamTableDescriptorValidator(true, true, true).validate(properties) Review comment: I think we can also remove it. As the comment suggests it doesn't check much. Update modes are optional for custom factories anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633670#comment-16633670 ] ASF GitHub Bot commented on FLINK-3875: --- twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#discussion_r221513851 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ## @@ -96,6 +96,6 @@ class StreamTableDescriptor( // this performs only basic validation // more validation can only happen within a factory -new StreamTableDescriptorValidator().validate(properties) +new StreamTableDescriptorValidator(true, true, true).validate(properties) Review comment: I think we can also remove it. As the comment suggests it doesn't check much. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633672#comment-16633672 ] ASF GitHub Bot commented on FLINK-3875: --- twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#discussion_r221513851 ## File path: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala ## @@ -96,6 +96,6 @@ class StreamTableDescriptor( // this performs only basic validation // more validation can only happen within a factory -new StreamTableDescriptorValidator().validate(properties) +new StreamTableDescriptorValidator(true, true, true).validate(properties) Review comment: I think we can also remove it. As the comment suggests it doesn't check much. Update modes are optional for custom factories anyway. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
twalthr commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r221515643 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); +ordersData.add(Tuple2.of(2L, "Euro")); +ordersData.add(Tuple2.of(1L, "US Dollar")); +ordersData.add(Tuple2.of(50L, "Yen")); +ordersData.add(Tuple2.of(3L, "Euro")); +ordersData.add(Tuple2.of(5L, "US Dollar")); + +List> ratesHistoryData = new ArrayList<>(); +ratesHistoryData.add(Tuple2.of("US Dollar", 102L)); +ratesHistoryData.add(Tuple2.of("Euro", 114L)); +ratesHistoryData.add(Tuple2.of("Yen", 1L)); +ratesHistoryData.add(Tuple2.of("Euro", 116L)); +ratesHistoryData.add(Tuple2.of("Euro", 119L)); + +DataStreamSource> ordersStream = env
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633674#comment-16633674 ] ASF GitHub Bot commented on FLINK-9712: --- twalthr commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r221515643 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); +ordersData.add(Tuple2.of(2L, "Euro")); +ordersData.add(Tuple2.of(1L, "US Dollar")); +ordersData.add(Tuple2.of(50L, "Yen")); +ordersData.add(Tuple2.of(3L, "Euro")); +ordersData.add(Tuple2.of(5L, "US Dollar")); + +List> ratesHistoryData = new ArrayList<>(); +ratesHistoryData.add(Tup
[jira] [Commented] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.
[ https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633676#comment-16633676 ] Timo Walther commented on FLINK-10464: -- [~huangjiatian] could you add a more complete example of your table program to reproduce your error. This issue might be a duplicate of FLINK-8897. The file that you posted is not visible. > TimeIndicatorRelDataType: digest can not describe a type completely. > > > Key: FLINK-10464 > URL: https://issues.apache.org/jira/browse/FLINK-10464 > Project: Flink > Issue Type: Bug >Reporter: huangjiatian >Priority: Minor > > I met a strange question when i use Flink SQL API. > The error message like that: > java.lang.AssertionError: Conversion to relational algebra failed to preserve > datatypes: > validated type: > RecordType(TIMESTAMP(3) NOT NULL rowtime) NOT NULL > converted type: > RecordType(TIMESTAMP(3) NOT NULL rowtime) NOT NULL > rel: > LogicalProject(rowtime=[$3]) > LogicalTableScan(table=[[hjtsrc]]) > > I found two difference type are considered equal. > !image-2018-09-28-13-11-43-515.png! > that mean, "select deviceid, rowtime.rowtime" equal to "select deviceid, > rowtime.proctime" > "digest" in TimeIndicatorRelDataType without event time message , it can not > describe a TimeIndicatorRelDataType completely. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw exception on pattern variable mis…
dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw exception on pattern variable mis… URL: https://github.com/apache/flink/pull/6758 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index ad321bf71b5..d7f915fc62e 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -1385,6 +1385,23 @@ Pattern.begin("patternName", skipStrategy) +{% warn Attention %} For SKIP_TO_FIRST/LAST there are two options how to handle cases when there are no elements mapped to +the specified variable. By default a NO_SKIP strategy will be used in this case. The other option is to throw exception in such situation. +One can enable this option by: + + + +{% highlight java %} +AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss() +{% endhighlight %} + + +{% highlight scala %} +AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss() +{% endhighlight %} + + + ## Detecting Patterns After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java index 8151a124af4..f4448a35560 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java @@ -43,8 +43,8 @@ * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy skipToFirst(String patternName) { - return new SkipToFirstStrategy(patternName); + public static SkipToFirstStrategy skipToFirst(String patternName) { + return new SkipToFirstStrategy(patternName, false); } /** @@ -53,8 +53,8 @@ public static AfterMatchSkipStrategy skipToFirst(String patternName) { * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy skipToLast(String patternName) { - return new SkipToLastStrategy(patternName); + public static SkipToLastStrategy skipToLast(String patternName) { + return new SkipToLastStrategy(patternName, false); } /** @@ -62,7 +62,7 @@ public static AfterMatchSkipStrategy skipToLast(String patternName) { * * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy skipPastLastEvent() { + public static SkipPastLastStrategy skipPastLastEvent() { return SkipPastLastStrategy.INSTANCE; } @@ -71,7 +71,7 @@ public static AfterMatchSkipStrategy skipPastLastEvent() { * * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy noSkip() { + public static NoSkipStrategy noSkip() { return NoSkipStrategy.INSTANCE; } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java new file mode 100644 index 000..5554151ccbd --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.cep.nfa.aftermatch; + +import org.apache.flink.cep.nfa.sharedbuffer.EventId; +import org.apache.flink.util.FlinkRuntimeException; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; + +import static o
[jira] [Commented] (FLINK-10417) Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST
[ https://issues.apache.org/jira/browse/FLINK-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633679#comment-16633679 ] ASF GitHub Bot commented on FLINK-10417: dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw exception on pattern variable mis… URL: https://github.com/apache/flink/pull/6758 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index ad321bf71b5..d7f915fc62e 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -1385,6 +1385,23 @@ Pattern.begin("patternName", skipStrategy) +{% warn Attention %} For SKIP_TO_FIRST/LAST there are two options how to handle cases when there are no elements mapped to +the specified variable. By default a NO_SKIP strategy will be used in this case. The other option is to throw exception in such situation. +One can enable this option by: + + + +{% highlight java %} +AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss() +{% endhighlight %} + + +{% highlight scala %} +AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss() +{% endhighlight %} + + + ## Detecting Patterns After specifying the pattern sequence you are looking for, it is time to apply it to your input stream to detect diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java index 8151a124af4..f4448a35560 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java @@ -43,8 +43,8 @@ * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy skipToFirst(String patternName) { - return new SkipToFirstStrategy(patternName); + public static SkipToFirstStrategy skipToFirst(String patternName) { + return new SkipToFirstStrategy(patternName, false); } /** @@ -53,8 +53,8 @@ public static AfterMatchSkipStrategy skipToFirst(String patternName) { * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy skipToLast(String patternName) { - return new SkipToLastStrategy(patternName); + public static SkipToLastStrategy skipToLast(String patternName) { + return new SkipToLastStrategy(patternName, false); } /** @@ -62,7 +62,7 @@ public static AfterMatchSkipStrategy skipToLast(String patternName) { * * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy skipPastLastEvent() { + public static SkipPastLastStrategy skipPastLastEvent() { return SkipPastLastStrategy.INSTANCE; } @@ -71,7 +71,7 @@ public static AfterMatchSkipStrategy skipPastLastEvent() { * * @return the created AfterMatchSkipStrategy */ - public static AfterMatchSkipStrategy noSkip() { + public static NoSkipStrategy noSkip() { return NoSkipStrategy.INSTANCE; } diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java new file mode 100644 index 000..5554151ccbd --- /dev/null +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.f
[jira] [Resolved] (FLINK-10417) Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST
[ https://issues.apache.org/jira/browse/FLINK-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-10417. -- Resolution: Done > Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST > -- > > Key: FLINK-10417 > URL: https://issues.apache.org/jira/browse/FLINK-10417 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10470) Add method to check if pattern can produce empty matches
Dawid Wysakowicz created FLINK-10470: Summary: Add method to check if pattern can produce empty matches Key: FLINK-10470 URL: https://issues.apache.org/jira/browse/FLINK-10470 Project: Flink Issue Type: Sub-task Components: CEP Reporter: Dawid Wysakowicz Assignee: Dawid Wysakowicz -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10470) Add method to check if pattern can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz updated FLINK-10470: - Description: There is couple of inconsistencies how CEP library handles greedy and reluctant operators at the beginning at end of pattern. This results in subtle problems how empty matches should be generated for patterns like e.g. A? or A*?, where one is greedy and the other one is reluctant. In order to provide first version of MATCH_RECOGNIZE function we should have a possibility to disable patterns which can produce empty matches. > Add method to check if pattern can produce empty matches > > > Key: FLINK-10470 > URL: https://issues.apache.org/jira/browse/FLINK-10470 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > > There is couple of inconsistencies how CEP library handles greedy and > reluctant operators at the beginning at end of pattern. This results in > subtle problems how empty matches should be generated for patterns like e.g. > A? or A*?, where one is greedy and the other one is reluctant. In order to > provide first version of MATCH_RECOGNIZE function we should have a > possibility to disable patterns which can produce empty matches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys opened a new pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches
dawidwys opened a new pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781 ## What is the purpose of the change This change introduces a method that allows checking if a Pattern can produce empty matches. It is needed to disable such patterns in MATCH_RECOGNIZE, until we are able to provide a full support of empty matches in CEP (with proper support of reluctant and greedy) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633712#comment-16633712 ] ASF GitHub Bot commented on FLINK-10470: dawidwys opened a new pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781 ## What is the purpose of the change This change introduces a method that allows checking if a Pattern can produce empty matches. It is needed to disable such patterns in MATCH_RECOGNIZE, until we are able to provide a full support of empty matches in CEP (with proper support of reluctant and greedy) ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add method to check if pattern can produce empty matches > > > Key: FLINK-10470 > URL: https://issues.apache.org/jira/browse/FLINK-10470 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > There is couple of inconsistencies how CEP library handles greedy and > reluctant operators at the beginning at end of pattern. This results in > subtle problems how empty matches should be generated for patterns like e.g. > A? or A*?, where one is greedy and the other one is reluctant. In order to > provide first version of MATCH_RECOGNIZE function we should have a > possibility to disable patterns which can produce empty matches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10470) Add method to check if pattern can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10470: --- Labels: pull-request-available (was: ) > Add method to check if pattern can produce empty matches > > > Key: FLINK-10470 > URL: https://issues.apache.org/jira/browse/FLINK-10470 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > There is couple of inconsistencies how CEP library handles greedy and > reluctant operators at the beginning at end of pattern. This results in > subtle problems how empty matches should be generated for patterns like e.g. > A? or A*?, where one is greedy and the other one is reluctant. In order to > provide first version of MATCH_RECOGNIZE function we should have a > possibility to disable patterns which can produce empty matches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221242052 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ## @@ -238,6 +238,37 @@ public String toString() { return tupleInfo; } + /** +* Resolves a type information for each specified income type and forms an instance of a resulting {@link TupleTypeInfo} type. +* @param incomeTypes tuple fields' types +* @param a resulting type of a tuple, e.g. Tuple1, Tuple2... +* @return A tuple information type, built from the specified income types. +*/ + @SuppressWarnings("unchecked") + @PublicEvolving + public static TupleTypeInfo getTupleTypeInfo(Class... incomeTypes) { + if (incomeTypes == null || incomeTypes.length == 0) { + throw new IllegalArgumentException(); Review comment: I think the good idea add here message for exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221248122 ## File path: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ## @@ -262,54 +267,115 @@ public void setCharset(Charset charset) { // // Mapping from types to parsers // - + /** -* Gets the parser for the type specified by the given class. Returns null, if no parser for that class +* Provides an instance of {@link FieldParser} that corresponds to the specified type. +* @param type a field type for which a {@link FieldParser} is needed. +* @return if there is a custom parser for the specified field type - it is returned; then, if there is a default parser +* responsible for the specified type - it is returned; otherwise, {@code null} is returned. +*/ + @SuppressWarnings("unchecked") + public static FieldParser getParserInstanceFor(Class type) { + ParserFactory parserFactory = (ParserFactory) CUSTOM_PARSERS.get(type); + if (parserFactory == null) { + parserFactory = (ParserFactory) DEFAULT_PARSERS.get(type); + } + + if (parserFactory == null) { + return null; + } + + return parserFactory.create(); + } + + /** +* Gets the default parser for the type specified by the given class. Returns null, if no parser for that class * is known. -* +* * @param type The class of the type to get the parser for. * @return The parser for the given type, or null, if no such parser exists. */ - public static Class> getParserForType(Class type) { - Class> parser = PARSERS.get(type); - if (parser == null) { + public static Class> getDefaultParserForType(Class type) { + ParserFactory parserFactory = DEFAULT_PARSERS.get(type); + if (parserFactory == null) { return null; } else { @SuppressWarnings("unchecked") - Class> typedParser = (Class>) parser; + Class> typedParser = (Class>) parserFactory.getParserType(); return typedParser; } } - - private static final Map, Class>> PARSERS = - new HashMap, Class>>(); - + + /** +* Gets the custom parser for the type specified by the given class. Returns null, if no parser for that class +* is known. +* +* @param type The class of the type to get the parser for. +* @return The parser for the given type, or null, if no such parser exists. +*/ + public static Class> getCustomParserForType(Class type) { + synchronized (CUSTOM_PARSERS) { + ParserFactory parserFactory = (ParserFactory) CUSTOM_PARSERS.get(type); + if (parserFactory == null) { + return null; + } else { + return parserFactory.getParserType(); + } + } + } + + private static final Map, ParserFactory> DEFAULT_PARSERS = new HashMap<>(); + static { // basic types - PARSERS.put(Byte.class, ByteParser.class); - PARSERS.put(Short.class, ShortParser.class); - PARSERS.put(Integer.class, IntParser.class); - PARSERS.put(Long.class, LongParser.class); - PARSERS.put(String.class, StringParser.class); - PARSERS.put(Float.class, FloatParser.class); - PARSERS.put(Double.class, DoubleParser.class); - PARSERS.put(Boolean.class, BooleanParser.class); - PARSERS.put(BigDecimal.class, BigDecParser.class); - PARSERS.put(BigInteger.class, BigIntParser.class); + DEFAULT_PARSERS.put(Byte.class, new DefaultParserFactory<>(ByteParser.class)); + DEFAULT_PARSERS.put(Short.class, new DefaultParserFactory<>(ShortParser.class)); + DEFAULT_PARSERS.put(Integer.class, new DefaultParserFactory<>(IntParser.class)); + DEFAULT_PARSERS.put(Long.class, new DefaultParserFactory<>(LongParser.class)); + DEFAULT_PARSERS.put(String.class, new DefaultParserFactory<>(StringParser.class)); + DEFAULT_PARSERS.put(Float.class, new DefaultParserFactory<>(FloatParser.class)); + DEFAULT_PARSERS.put(Double.class, n
[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221250089 ## File path: flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java ## @@ -473,7 +534,7 @@ private static void appendTupleTypeGenerics(StringBuilder sb, int numFields) { } private static final String HEADER = - "/*\n" + "/*\n" Review comment: please revert this change This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221526823 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ## @@ -20,22 +20,53 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.api.java.typeutils.PojoField; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder -import org.apache.flink.api.java.tuple.*; -//CHECKSTYLE.ON: AvoidStarImport|ImportOrder - +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder Review comment: I assume it isn't necessary changes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221243381 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ## @@ -129,7 +129,7 @@ protected TypeExtractor() { // TypeInfoFactory registry // - private static Map> registeredTypeInfoFactories = new HashMap<>(); + public static Map> registeredTypeInfoFactories = new HashMap<>(); Review comment: For what is it chages? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers
[ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633715#comment-16633715 ] ASF GitHub Bot commented on FLINK-2435: --- ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221248122 ## File path: flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java ## @@ -262,54 +267,115 @@ public void setCharset(Charset charset) { // // Mapping from types to parsers // - + /** -* Gets the parser for the type specified by the given class. Returns null, if no parser for that class +* Provides an instance of {@link FieldParser} that corresponds to the specified type. +* @param type a field type for which a {@link FieldParser} is needed. +* @return if there is a custom parser for the specified field type - it is returned; then, if there is a default parser +* responsible for the specified type - it is returned; otherwise, {@code null} is returned. +*/ + @SuppressWarnings("unchecked") + public static FieldParser getParserInstanceFor(Class type) { + ParserFactory parserFactory = (ParserFactory) CUSTOM_PARSERS.get(type); + if (parserFactory == null) { + parserFactory = (ParserFactory) DEFAULT_PARSERS.get(type); + } + + if (parserFactory == null) { + return null; + } + + return parserFactory.create(); + } + + /** +* Gets the default parser for the type specified by the given class. Returns null, if no parser for that class * is known. -* +* * @param type The class of the type to get the parser for. * @return The parser for the given type, or null, if no such parser exists. */ - public static Class> getParserForType(Class type) { - Class> parser = PARSERS.get(type); - if (parser == null) { + public static Class> getDefaultParserForType(Class type) { + ParserFactory parserFactory = DEFAULT_PARSERS.get(type); + if (parserFactory == null) { return null; } else { @SuppressWarnings("unchecked") - Class> typedParser = (Class>) parser; + Class> typedParser = (Class>) parserFactory.getParserType(); return typedParser; } } - - private static final Map, Class>> PARSERS = - new HashMap, Class>>(); - + + /** +* Gets the custom parser for the type specified by the given class. Returns null, if no parser for that class +* is known. +* +* @param type The class of the type to get the parser for. +* @return The parser for the given type, or null, if no such parser exists. +*/ + public static Class> getCustomParserForType(Class type) { + synchronized (CUSTOM_PARSERS) { + ParserFactory parserFactory = (ParserFactory) CUSTOM_PARSERS.get(type); + if (parserFactory == null) { + return null; + } else { + return parserFactory.getParserType(); + } + } + } + + private static final Map, ParserFactory> DEFAULT_PARSERS = new HashMap<>(); + static { // basic types - PARSERS.put(Byte.class, ByteParser.class); - PARSERS.put(Short.class, ShortParser.class); - PARSERS.put(Integer.class, IntParser.class); - PARSERS.put(Long.class, LongParser.class); - PARSERS.put(String.class, StringParser.class); - PARSERS.put(Float.class, FloatParser.class); - PARSERS.put(Double.class, DoubleParser.class); - PARSERS.put(Boolean.class, BooleanParser.class); - PARSERS.put(BigDecimal.class, BigDecParser.class); - PARSERS.put(BigInteger.class, BigIntParser.class); + DEFAULT_PARSERS.put(Byte.class, new DefaultParserFactory<>(ByteParser.class)); + DEFAULT_PARSERS.put(Short.class, new DefaultParserFactory<>(ShortParser.class)); + DEFAULT_PARSERS.put(Integer.class, new DefaultParserFactory<>(IntParser.class)); + DEFAULT_PARSERS.put(Long.class, new DefaultParserFactory<>(LongParser.c
[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers
[ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633716#comment-16633716 ] ASF GitHub Bot commented on FLINK-2435: --- ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221243381 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java ## @@ -129,7 +129,7 @@ protected TypeExtractor() { // TypeInfoFactory registry // - private static Map> registeredTypeInfoFactories = new HashMap<>(); + public static Map> registeredTypeInfoFactories = new HashMap<>(); Review comment: For what is it chages? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for custom CSV field parsers > > > Key: FLINK-2435 > URL: https://issues.apache.org/jira/browse/FLINK-2435 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Dmitrii Kober >Priority: Minor > Labels: pull-request-available > > The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types > (byte, short, int, long, float, double, boolean, String). > It would be good to add support for CSV field parsers for custom data types > which can be registered in a {{CSVReader}}. > We could offer two interfaces for field parsers. > 1. The regular low-level {{FieldParser}} which operates on a byte array and > offsets. > 2. A {{StringFieldParser}} which operates on a String that has been extracted > by a {{StringParser}} before. This interface will be easier to implement but > less efficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers
[ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633713#comment-16633713 ] ASF GitHub Bot commented on FLINK-2435: --- ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221250089 ## File path: flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java ## @@ -473,7 +534,7 @@ private static void appendTupleTypeGenerics(StringBuilder sb, int numFields) { } private static final String HEADER = - "/*\n" + "/*\n" Review comment: please revert this change This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for custom CSV field parsers > > > Key: FLINK-2435 > URL: https://issues.apache.org/jira/browse/FLINK-2435 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Dmitrii Kober >Priority: Minor > Labels: pull-request-available > > The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types > (byte, short, int, long, float, double, boolean, String). > It would be good to add support for CSV field parsers for custom data types > which can be registered in a {{CSVReader}}. > We could offer two interfaces for field parsers. > 1. The regular low-level {{FieldParser}} which operates on a byte array and > offsets. > 2. A {{StringFieldParser}} which operates on a String that has been extracted > by a {{StringParser}} before. This interface will be easier to implement but > less efficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers
[ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633717#comment-16633717 ] ASF GitHub Bot commented on FLINK-2435: --- ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221526823 ## File path: flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java ## @@ -20,22 +20,53 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeHint; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.Utils; import org.apache.flink.api.java.operators.DataSource; +import org.apache.flink.api.java.tuple.Tuple; +import org.apache.flink.api.java.tuple.Tuple1; +import org.apache.flink.api.java.tuple.Tuple10; +import org.apache.flink.api.java.tuple.Tuple11; +import org.apache.flink.api.java.tuple.Tuple12; +import org.apache.flink.api.java.tuple.Tuple13; +import org.apache.flink.api.java.tuple.Tuple14; +import org.apache.flink.api.java.tuple.Tuple15; +import org.apache.flink.api.java.tuple.Tuple16; +import org.apache.flink.api.java.tuple.Tuple17; +import org.apache.flink.api.java.tuple.Tuple18; +import org.apache.flink.api.java.tuple.Tuple19; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple20; +import org.apache.flink.api.java.tuple.Tuple21; +import org.apache.flink.api.java.tuple.Tuple22; +import org.apache.flink.api.java.tuple.Tuple23; +import org.apache.flink.api.java.tuple.Tuple24; +import org.apache.flink.api.java.tuple.Tuple25; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.api.java.tuple.Tuple4; +import org.apache.flink.api.java.tuple.Tuple5; +import org.apache.flink.api.java.tuple.Tuple6; +import org.apache.flink.api.java.tuple.Tuple7; +import org.apache.flink.api.java.tuple.Tuple8; +import org.apache.flink.api.java.tuple.Tuple9; +import org.apache.flink.api.java.typeutils.PojoField; import org.apache.flink.api.java.typeutils.PojoTypeInfo; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.Path; +import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; -//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder -import org.apache.flink.api.java.tuple.*; -//CHECKSTYLE.ON: AvoidStarImport|ImportOrder - +import java.lang.reflect.Field; import java.util.ArrayList; import java.util.Arrays; +//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder Review comment: I assume it isn't necessary changes This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for custom CSV field parsers > > > Key: FLINK-2435 > URL: https://issues.apache.org/jira/browse/FLINK-2435 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Dmitrii Kober >Priority: Minor > Labels: pull-request-available > > The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types > (byte, short, int, long, float, double, boolean, String). > It would be good to add support for CSV field parsers for custom data types > which can be registered in a {{CSVReader}}. > We could offer two interfaces for field parsers. > 1. The regular low-level {{FieldParser}} which operates on a byte array and > offsets. > 2. A {{StringFieldParser}} which operates on a String that has been extracted > by a {{StringParser}} before. This interface will be easier to implement but > less efficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers
[ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633714#comment-16633714 ] ASF GitHub Bot commented on FLINK-2435: --- ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader URL: https://github.com/apache/flink/pull/5862#discussion_r221242052 ## File path: flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java ## @@ -238,6 +238,37 @@ public String toString() { return tupleInfo; } + /** +* Resolves a type information for each specified income type and forms an instance of a resulting {@link TupleTypeInfo} type. +* @param incomeTypes tuple fields' types +* @param a resulting type of a tuple, e.g. Tuple1, Tuple2... +* @return A tuple information type, built from the specified income types. +*/ + @SuppressWarnings("unchecked") + @PublicEvolving + public static TupleTypeInfo getTupleTypeInfo(Class... incomeTypes) { + if (incomeTypes == null || incomeTypes.length == 0) { + throw new IllegalArgumentException(); Review comment: I think the good idea add here message for exception. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add support for custom CSV field parsers > > > Key: FLINK-2435 > URL: https://issues.apache.org/jira/browse/FLINK-2435 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Dmitrii Kober >Priority: Minor > Labels: pull-request-available > > The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types > (byte, short, int, long, float, double, boolean, String). > It would be good to add support for CSV field parsers for custom data types > which can be registered in a {{CSVReader}}. > We could offer two interfaces for field parsers. > 1. The regular low-level {{FieldParser}} which operates on a byte array and > offsets. > 2. A {{StringFieldParser}} which operates on a String that has been extracted > by a {{StringParser}} before. This interface will be easier to implement but > less efficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-2435) Add support for custom CSV field parsers
[ https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-2435: -- Labels: pull-request-available (was: ) > Add support for custom CSV field parsers > > > Key: FLINK-2435 > URL: https://issues.apache.org/jira/browse/FLINK-2435 > Project: Flink > Issue Type: New Feature > Components: DataSet API >Affects Versions: 0.10.0 >Reporter: Fabian Hueske >Assignee: Dmitrii Kober >Priority: Minor > Labels: pull-request-available > > The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types > (byte, short, int, long, float, double, boolean, String). > It would be good to add support for CSV field parsers for custom data types > which can be registered in a {{CSVReader}}. > We could offer two interfaces for field parsers. > 1. The regular low-level {{FieldParser}} which operates on a byte array and > offsets. > 2. A {{StringFieldParser}} which operates on a String that has been extracted > by a {{StringParser}} before. This interface will be easier to implement but > less efficient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.
[ https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633722#comment-16633722 ] Fabian Hueske commented on FLINK-10447: --- Hi [~Suxing Lee], you should be aware that this is a major effort. Just porting your code to StreamingFileSink won't be sufficient. As I said before, we need to thoroughly discuss the design of such a major feature before starting to implement it. We need a detailed design document that addresses all the points (and probably more) that I've listed before. I would suggest to close this issue and the pull request and create a new issue as a subissue of FLINK-8535. Best, Fabian > Create Bucketing Table Sink. > > > Key: FLINK-10447 > URL: https://issues.apache.org/jira/browse/FLINK-10447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Suxing Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > It would be nice to integrate the table APIs with the HDFS connectors so that > the rows in the tables can be directly pushed into HDFS. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#issuecomment-425835514 Thank you @dawidwys. Travis gave green light. I will merge this... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633731#comment-16633731 ] ASF GitHub Bot commented on FLINK-3875: --- twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611#issuecomment-425835514 Thank you @dawidwys. Travis gave green light. I will merge this... This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL
pnowojski commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#discussion_r221537661 ## File path: docs/dev/table/functions.md ## @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2) + + +{% highlight text %} +COSH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cosine of numeric. Return value type is DOUBLE. Review comment: @xccui if you think using `` is better I'm fine with that, I don't mind one way or the other. However I don't understand: > before reaching a consensus about the format and the internal implementation, I suggest to keep the current doc style, i.e., without mentioning the return type, for now. We are already following a convention that for all of the math functions we are dropping precision and returning `double`. What's the point of not documenting it? If you could guarantee me that we will document those return types lets say within a month, I would be fine with postponing it. However I don't believe something like that would happen, so I prefer to incrementally start adding this return type information right now. Later if someone will want, we can reformat the page. I do not see any downside to such incremental approach until that moment. > why still indicating the return type for each numeric function explicitly? How about adding a single note for that? since some of them can be implemented precisely (`abs`, `min`, `max`, `ceil`, ...), others can be for some variants (`power(DECIMAL, INT)`). We can state this general rule that functions returning `DOUBLE` are approximations, but we still need specific information for every function, whether it returns approximated (`DOUBLE`) or precise (`DECIMAL`) result. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL
[ https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633740#comment-16633740 ] ASF GitHub Bot commented on FLINK-10340: pnowojski commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL URL: https://github.com/apache/flink/pull/6700#discussion_r221537661 ## File path: docs/dev/table/functions.md ## @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2) + + +{% highlight text %} +COSH(numeric) +{% endhighlight %} + + +Returns the hyperbolic cosine of numeric. Return value type is DOUBLE. Review comment: @xccui if you think using `` is better I'm fine with that, I don't mind one way or the other. However I don't understand: > before reaching a consensus about the format and the internal implementation, I suggest to keep the current doc style, i.e., without mentioning the return type, for now. We are already following a convention that for all of the math functions we are dropping precision and returning `double`. What's the point of not documenting it? If you could guarantee me that we will document those return types lets say within a month, I would be fine with postponing it. However I don't believe something like that would happen, so I prefer to incrementally start adding this return type information right now. Later if someone will want, we can reformat the page. I do not see any downside to such incremental approach until that moment. > why still indicating the return type for each numeric function explicitly? How about adding a single note for that? since some of them can be implemented precisely (`abs`, `min`, `max`, `ceil`, ...), others can be for some variants (`power(DECIMAL, INT)`). We can state this general rule that functions returning `DOUBLE` are approximations, but we still need specific information for every function, whether it returns approximated (`DOUBLE`) or precise (`DECIMAL`) result. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add Cosh math function supported in Table API and SQL > - > > Key: FLINK-10340 > URL: https://issues.apache.org/jira/browse/FLINK-10340 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.6.0 >Reporter: Sergey Tsvetkov >Assignee: vinoyang >Priority: Minor > Labels: pull-request-available > > Implement udf of cosh, just like in oracle > [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623] > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9715) Support versioned joins with event time
[ https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633747#comment-16633747 ] Piotr Nowojski commented on FLINK-9715: --- [~hequn8128] probably something like that would be possible to implement, but it isn't right now nor is it planned. Also it would need separate discussion to define the semantic what should it mean. For append only tables it would make sense (return only the rows between {{rowtime_s}} and {{rowtime_e}} ?), but what about retractions and/or updates? If row is removed/updated in this period, what should be returned? > Support versioned joins with event time > --- > > Key: FLINK-9715 > URL: https://issues.apache.org/jira/browse/FLINK-9715 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Major > Labels: pull-request-available > > Queries like: > {code:java} > SELECT > o.amount * r.rate > FROM > Orders AS o, > LATERAL TABLE (Rates(o.rowtime)) AS r > WHERE o.currency = r.currency{code} > should work with event time -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10466) flink-yarn-tests should depend flink-dist
[ https://issues.apache.org/jira/browse/FLINK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633748#comment-16633748 ] Chesnay Schepler commented on FLINK-10466: -- if flink-yarn-tests does indeed rely on flink-dist being built beforehand then we should definitely add it as a dependency. > flink-yarn-tests should depend flink-dist > - > > Key: FLINK-10466 > URL: https://issues.apache.org/jira/browse/FLINK-10466 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.7.0 >Reporter: tison >Priority: Major > Fix For: 1.7.0 > > > may be adding > {code:java} > > org.apache.flink > flink-dist_${scala.binary.version} > ${project.version} > test > pom > {code} > not really sure but it causes failure on my automate testing process, and by > adding this dependency the error disappear. Even I wonder how it works > currently on travis. > flink-yarn-test obviously depends on flink-dist since some tests try to find > flink uberjar. > Please take a look for this. cc [~Zentol] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r221540451 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); +ordersData.add(Tuple2.of(2L, "Euro")); +ordersData.add(Tuple2.of(1L, "US Dollar")); +ordersData.add(Tuple2.of(50L, "Yen")); +ordersData.add(Tuple2.of(3L, "Euro")); +ordersData.add(Tuple2.of(5L, "US Dollar")); + +List> ratesHistoryData = new ArrayList<>(); +ratesHistoryData.add(Tuple2.of("US Dollar", 102L)); +ratesHistoryData.add(Tuple2.of("Euro", 114L)); +ratesHistoryData.add(Tuple2.of("Yen", 1L)); +ratesHistoryData.add(Tuple2.of("Euro", 116L)); +ratesHistoryData.add(Tuple2.of("Euro", 119L)); + +DataStreamSource> ordersStream = e
[GitHub] dawidwys commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink
dawidwys commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink URL: https://github.com/apache/flink/pull/6769#issuecomment-425840104 Hi @SuXingLee, thank you very much for your contribution. I think having a table sink for file system would be a great feature. However I think we should use the new `StreamingFileSink`, on which the community is working for some time already, for this purpose. This new connector is fixing a few problems with the old one, that's why it will probably replace the `BucketingSink` at some point. To sum up I think we should use the `StreamingFileSink` rather than `BucketingSink` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Assigned] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)
[ https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Yun Gao reassigned FLINK-10469: --- Assignee: Yun Gao > FileChannel may not write the whole buffer in a single call to > FileChannel.write(Buffer buffer) > --- > > Key: FLINK-10469 > URL: https://issues.apache.org/jira/browse/FLINK-10469 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2 >Reporter: Yun Gao >Assignee: Yun Gao >Priority: Major > > Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that > this method will not return before the whole buffer is written, like the one > in _AsynchronousFileIOChannel.write()._ > > However, this assumption may not be right for all the environments. We have > encountered the case that only part of a buffer was written on a cluster with > a high IO load, and the target file got messy. > > To fix this issue, I think we should add a utility method in the > org.apache.flink.util.IOUtils to ensure the whole buffer is written with a > loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this > new method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API
[ https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633752#comment-16633752 ] ASF GitHub Bot commented on FLINK-9712: --- pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] Document processing time Temporal Table Joins URL: https://github.com/apache/flink/pull/6741#discussion_r221540451 ## File path: docs/dev/table/streaming/temporal_tables.md ## @@ -0,0 +1,263 @@ +--- +title: "Temporal Tables" +nav-parent_id: streaming_tableapi +nav-pos: 4 +--- + + +Temporal Tables represent a concept of a table that changes over time +and for which Flink keeps track of those changes. + +* This will be replaced by the TOC +{:toc} + +Motivation +-- + +Lets assume that we have two following tables. + +{% highlight sql %} +SELECT * FROM Orders; + +rowtime amount currency +=== == = +10:152 Euro +10:301 US Dollar +10:32 50 Yen +10:523 Euro +11:045 US Dollar +{% endhighlight %} + +`Orders` represents payments for given `amount` and given `currency`. +For example at `10:15` there was an order for an amount of `2 Euro`. + +{% highlight sql %} +SELECT * FROM RatesHistory; + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 +10:45 Euro116 +11:15 Euro119 +{% endhighlight %} + +`RatesHistory` represents an ever changing append-only stream of currency exchange rates, with respect to `Yen` (which has a rate of `1`). +For example exchange rate for a period from `09:00` to `10:45` of `Euro` to `Yen` was `114`. +From `10:45` to `11:15` it was `116`. + +Task is now to calculate a value of all of the `Orders` converted to common currency (`Yen`). +For example we would like to convert the order +{% highlight sql %} +rowtime amount currency +=== == = +10:152 Euro +{% endhighlight %} +using the appropriate conversion rate for the given `rowtime` (`114`). +Without using Temporal Tables in order to do so, one would need to write such query: +{% highlight sql %} +SELECT + SUM(o.amount * r.rate) AS amount +FROM Orders AS o, + RatesHistory AS r +WHERE r.currency = o.currency +AND r.rowtime = ( + SELECT MAX(rowtime) + FROM Rates AS r2 + WHERE r2.currency = o.currency + AND r2.rowtime <= o.rowtime); +{% endhighlight %} +Temporal Tables are a concept that aims to simplify this query. + +In order to define a Temporal Table, we must define it's primary key, +Primary key allows us to overwrite older values in the Temporal Table. +In the above example `currency` would be a primary key for `RatesHistory` table. +Secondly a [time attribute](time_attributes.html) is also required, +that determines which row is newer and which one is older. + +Temporal Table Functions + + +In order to access the data in the Temporal Table, one must define a time attribute for which matching version of the table will be returned. +Flink uses the SQL syntax of Table Functions to provide a way to express it. +Once defined, Temporal Table Function takes a single argument `timeAttribute` and returns a set of rows. +This set contains the latest versions of the rows for all of existing primary keys with respect to the given `timeAttribute`. + +Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function based on `RatesHistory` table. +We could query such function in the following way: + +{% highlight sql %} +SELECT * FROM Rates('10:15'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +09:00 Euro114 +09:00 Yen 1 + +SELECT * FROM Rates('11:00'); + +rowtime currency rate +=== == +09:00 US Dollar 102 +10:45 Euro116 +09:00 Yen 1 +{% endhighlight %} + +Each query to `Rates(timeAttribute)` would return the state of the `Rates` for the given `timeAttribute`*[]: + +**Note**: Currently Flink doesn't support directly querying the Temporal Table Functions with a constant `timeAttribute`. +At the moment Temporal Table Functions can only be used in joins. +Above example was used to provide an intuition about what function `Rates(timeAttribute)` returns. + +Processing time +--- + +### Defining Temporal Table Function + +In order to define processing time Temporal Table: + + + +{% highlight java %} +StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); +StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env); + +List> ordersData = new ArrayList<>(); +ordersData.add(Tuple2.of(2L, "Euro")); +ordersData.add(Tuple2.of(1L, "US Dollar")); +ordersData.add(Tuple2.of(50L, "Yen")); +ordersData.add(Tuple2.of(3L, "Euro")); +ordersData.add(Tuple2.of(5L, "US Dollar")); + +List> ratesHistoryData = new ArrayList<>(); +ratesHistoryData.add(T
[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.
[ https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633751#comment-16633751 ] ASF GitHub Bot commented on FLINK-10447: dawidwys commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink URL: https://github.com/apache/flink/pull/6769#issuecomment-425840104 Hi @SuXingLee, thank you very much for your contribution. I think having a table sink for file system would be a great feature. However I think we should use the new `StreamingFileSink`, on which the community is working for some time already, for this purpose. This new connector is fixing a few problems with the old one, that's why it will probably replace the `BucketingSink` at some point. To sum up I think we should use the `StreamingFileSink` rather than `BucketingSink` here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Create Bucketing Table Sink. > > > Key: FLINK-10447 > URL: https://issues.apache.org/jira/browse/FLINK-10447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Suxing Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > It would be nice to integrate the table APIs with the HDFS connectors so that > the rows in the tables can be directly pushed into HDFS. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch
asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index f79dd93486e..0b7d8948f79 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -43,6 +43,7 @@ The following table list all available connectors and formats. Their mutual comp | Name | Version | Maven dependency | SQL Client JAR | | : | : | :--- | :--| | Filesystem| | Built-in | Built-in | +| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | | Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | | Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | | Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | @@ -588,6 +589,111 @@ Make sure to add the version-specific Kafka dependency. In addition, a correspon {% top %} +### Elasticsearch Connector + +Sink: Streaming Append Mode +Sink: Streaming Upsert Mode +Format: JSON-only + +The Elasticsearch connector allows for writing into an index of the Elasticsearch search engine. + +The connector can operate in [upsert mode](#update-modes) for exchanging UPSERT/DELETE messages with the external system using a [key defined by the query](streaming.html#table-to-stream-conversion). + +For append-only queries, the connector can also operate in [append mode](#update-modes) for exchanging only INSERT messages with the external system. If no key is defined by the query, a key is automatically generated by Elasticsearch. + +The connector can be defined as follows: + + + +{% highlight java %} +.connect( + new Elasticsearch() +.version("6") // required: valid connector versions are "6" +.host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to +.index("MyUsers") // required: Elasticsearch index +.documentType("user") // required: Elasticsearch document type + +.keyDelimiter("$")// optional: delimiter for composite keys ("_" by default) + // e.g., "$" would result in IDs "KEY1$KEY2$KEY3" +.keyNullLiteral("n/a")// optional: representation for null fields in keys ("null" by default) + +// optional: failure handling strategy in case a request to Elasticsearch fails (fail by default) +.failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure +.failureHandlerIgnore()// or ignores failures and drops the request +.failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation +.failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass + +// optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency +.disableFlushOnCheckpoint()// optional: disables flushing on checkpoint (see notes below!) +.bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request +.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request + // (only MB granularity is supported) +.bulkFlushInterval(6L) // optional: bulk flush interval (in milliseconds) + +.bulkFlushBackoffConstant()// optional: use a constant backoff type +.bulkFlushBackoffExponential() // or use an exponential backoff type +.bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries +.bulkFlushBackoffDelay(3L) // optional: delay between each backoff attempt (in milliseconds) + +// optional: connection properties to be used during RE
[jira] [Resolved] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-3875. - Resolution: Fixed Fix Version/s: 1.7.0 Fixed in 1.7.0: 10f9f1d431dbf047ea14130d9fb7fdd4f78178fc > Add a TableSink for Elasticsearch > - > > Key: FLINK-3875 > URL: https://issues.apache.org/jira/browse/FLINK-3875 > Project: Flink > Issue Type: New Feature > Components: Streaming Connectors, Table API & SQL >Reporter: Fabian Hueske >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add a TableSink that writes data to Elasticsearch -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch
[ https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633765#comment-16633765 ] ASF GitHub Bot commented on FLINK-3875: --- asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch URL: https://github.com/apache/flink/pull/6611 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md index f79dd93486e..0b7d8948f79 100644 --- a/docs/dev/table/connect.md +++ b/docs/dev/table/connect.md @@ -43,6 +43,7 @@ The following table list all available connectors and formats. Their mutual comp | Name | Version | Maven dependency | SQL Client JAR | | : | : | :--- | :--| | Filesystem| | Built-in | Built-in | +| Elasticsearch | 6 | `flink-connector-elasticsearch6` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | | Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | | Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | | Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | @@ -588,6 +589,111 @@ Make sure to add the version-specific Kafka dependency. In addition, a correspon {% top %} +### Elasticsearch Connector + +Sink: Streaming Append Mode +Sink: Streaming Upsert Mode +Format: JSON-only + +The Elasticsearch connector allows for writing into an index of the Elasticsearch search engine. + +The connector can operate in [upsert mode](#update-modes) for exchanging UPSERT/DELETE messages with the external system using a [key defined by the query](streaming.html#table-to-stream-conversion). + +For append-only queries, the connector can also operate in [append mode](#update-modes) for exchanging only INSERT messages with the external system. If no key is defined by the query, a key is automatically generated by Elasticsearch. + +The connector can be defined as follows: + + + +{% highlight java %} +.connect( + new Elasticsearch() +.version("6") // required: valid connector versions are "6" +.host("localhost", 9200, "http") // required: one or more Elasticsearch hosts to connect to +.index("MyUsers") // required: Elasticsearch index +.documentType("user") // required: Elasticsearch document type + +.keyDelimiter("$")// optional: delimiter for composite keys ("_" by default) + // e.g., "$" would result in IDs "KEY1$KEY2$KEY3" +.keyNullLiteral("n/a")// optional: representation for null fields in keys ("null" by default) + +// optional: failure handling strategy in case a request to Elasticsearch fails (fail by default) +.failureHandlerFail() // optional: throws an exception if a request fails and causes a job failure +.failureHandlerIgnore()// or ignores failures and drops the request +.failureHandlerRetryRejected() // or re-adds requests that have failed due to queue capacity saturation +.failureHandlerCustom(...) // or custom failure handling with a ActionRequestFailureHandler subclass + +// optional: configure how to buffer elements before sending them in bulk to the cluster for efficiency +.disableFlushOnCheckpoint()// optional: disables flushing on checkpoint (see notes below!) +.bulkFlushMaxActions(42) // optional: maximum number of actions to buffer for each bulk request +.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered actions in bytes per bulk request + // (only MB granularity is supported) +.bulkFlushInterval(6L) // optional: bulk flush interval (in milliseconds) + +.bulkFlushBackoffConstant()// optional: use a constant backoff type +.bulkFlushBackoffExponential() // or use an exponent
[GitHub] zentol commented on a change in pull request #6731: [FLINK-10312] Propagate exception from server to client in REST API
zentol commented on a change in pull request #6731: [FLINK-10312] Propagate exception from server to client in REST API URL: https://github.com/apache/flink/pull/6731#discussion_r221546026 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ## @@ -207,12 +210,10 @@ resultFuture.whenComplete( (innerT, innerThrowable) -> scheduledFuture.cancel(false)); } else { - final String errorMsg = retries == 0 ? - "Number of retries has been exhausted." : - "Exception is not retryable."; - resultFuture.completeExceptionally(new RetryException( - "Could not complete the operation. " + errorMsg, - throwable)); + RetryException retryException = new RetryException( + "Could not complete the operation: number of retries has been exhausted.", Review comment: I would stick with the original formatting: `Could not complete the operation. Number of retries has been exhausted.` When seeing a `:` I would expect a reason as to _why_ it couldn't be completed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10312) Wrong / missing exception when submitting job
[ https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633771#comment-16633771 ] ASF GitHub Bot commented on FLINK-10312: zentol commented on a change in pull request #6731: [FLINK-10312] Propagate exception from server to client in REST API URL: https://github.com/apache/flink/pull/6731#discussion_r221546026 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java ## @@ -207,12 +210,10 @@ resultFuture.whenComplete( (innerT, innerThrowable) -> scheduledFuture.cancel(false)); } else { - final String errorMsg = retries == 0 ? - "Number of retries has been exhausted." : - "Exception is not retryable."; - resultFuture.completeExceptionally(new RetryException( - "Could not complete the operation. " + errorMsg, - throwable)); + RetryException retryException = new RetryException( + "Could not complete the operation: number of retries has been exhausted.", Review comment: I would stick with the original formatting: `Could not complete the operation. Number of retries has been exhausted.` When seeing a `:` I would expect a reason as to _why_ it couldn't be completed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Wrong / missing exception when submitting job > - > > Key: FLINK-10312 > URL: https://issues.apache.org/jira/browse/FLINK-10312 > Project: Flink > Issue Type: Bug > Components: JobManager >Affects Versions: 1.5.2, 1.6.0 >Reporter: Stephan Ewen >Assignee: Andrey Zagrebin >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2, 1.5.5 > > Attachments: lmerge-TR.pdf > > > h3. Problem > When submitting a job that cannot be created / initialized on the JobManager, > there is no proper error message. The exception says *"Could not retrieve the > execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)"* > h3. Steps to Reproduce > Create a streaming job, set a state backend with a non existing file system > scheme. > h3. Full Stack Trace > {code} > Submitting a job where instantiation on the JM fails yields this, which seems > like a major regression from seeing the actual exception: > org.apache.flink.client.program.ProgramInvocationException: Could not > retrieve the execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f) > at > org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486) > at > org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66) > at > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511) > at > com.dataartisans.streamledger.examples.simpletrade.SimpleTradeExample.main(SimpleTradeExample.java:98) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421) > at > org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426) > at > org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804) > at > org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280) > at org.apache.fl
[GitHub] dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next strategy
dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next strategy URL: https://github.com/apache/flink/pull/6756 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index d7f915fc62e..c8e836349ef 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -1288,6 +1288,15 @@ For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the diff After found matching b1 b2 b3 c, the match process will not discard any result. + +SKIP_TO_NEXT + +b1 b2 b3 c +b2 b3 c +b3 c + +After found matching b1 b2 b3 c, the match process will not discard any result, because no other match could start at b1. + SKIP_PAST_LAST_EVENT @@ -1296,7 +1305,7 @@ For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the diff After found matching b1 b2 b3 c, the match process will discard all started partial matches. -SKIP_TO_FIRST[b*] +SKIP_TO_FIRST[b] b1 b2 b3 c b2 b3 c @@ -1340,7 +1349,35 @@ Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a b c1 c2 c3 d` Then the r a b c1 c2 c3 d c1 c2 c3 d -After found matching a b c1 c2 c3 d, the match process will try to discard all partial matches started before c1. There is one such match b c1 c2 c3 d. +After found matching a b c1 c2 c3 d, the match process will discard all partial matches started before c1. There is one such match b c1 c2 c3 d. + + + +To better understand the difference between NO_SKIP and SKIP_TO_NEXT take a look at following example: +Pattern: `a b+` and sequence: `a b1 b2 b3` Then the results will be: + + + + +Skip Strategy +Result + Description + + +NO_SKIP + +a b1 +a b1 b2 +a b1 b2 b3 + +After found matching a b1, the match process will not discard any result. + + +SKIP_TO_NEXT[b*] + +a b1 + +After found matching a b1, the match process will discard all partial matches started at a. This means neither a b1 b2 nor a b1 b2 b3 could be generated. @@ -1354,6 +1391,10 @@ To specify which skip strategy to use, just create an `AfterMatchSkipStrategy` b AfterMatchSkipStrategy.noSkip() Create a NO_SKIP skip strategy + +AfterMatchSkipStrategy.skipToNext() +Create a SKIP_TO_NEXT skip strategy + AfterMatchSkipStrategy.skipPastLastEvent() Create a SKIP_PAST_LAST_EVENT skip strategy diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java index f4448a35560..7578e29bbfa 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java @@ -38,7 +38,7 @@ private static final long serialVersionUID = -4048930333619068531L; /** -* Discards every partial match that contains event of the match preceding the first of *PatternName*. +* Discards every partial match that started before the first event of emitted match mapped to *PatternName*. * * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy @@ -48,7 +48,7 @@ public static SkipToFirstStrategy skipToFirst(String patternName) { } /** -* Discards every partial match that contains event of the match preceding the last of *PatternName*. +* Discards every partial match that started before the last event of emitted match mapped to *PatternName*. * * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy @@ -58,7 +58,7 @@ public static SkipToLastStrategy skipToLast(String patternName) { } /** -* Discards every partial match that contains event of the match. +* Discards every partial match that started before emitted match ended. * * @return the created AfterMatchSkipStrategy */ @@ -66,6 +66,15 @@ public static SkipPastLastStrategy skipPastLastEvent() { return SkipPastLastStrategy.INSTANCE; } + /** +* Discards every partial match that started with the same event, emitted match was started. +
[jira] [Commented] (FLINK-10414) Add skip to next strategy
[ https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633773#comment-16633773 ] ASF GitHub Bot commented on FLINK-10414: dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next strategy URL: https://github.com/apache/flink/pull/6756 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index d7f915fc62e..c8e836349ef 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -1288,6 +1288,15 @@ For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the diff After found matching b1 b2 b3 c, the match process will not discard any result. + +SKIP_TO_NEXT + +b1 b2 b3 c +b2 b3 c +b3 c + +After found matching b1 b2 b3 c, the match process will not discard any result, because no other match could start at b1. + SKIP_PAST_LAST_EVENT @@ -1296,7 +1305,7 @@ For example, for a given pattern `b+ c` and a data stream `b1 b2 b3 c`, the diff After found matching b1 b2 b3 c, the match process will discard all started partial matches. -SKIP_TO_FIRST[b*] +SKIP_TO_FIRST[b] b1 b2 b3 c b2 b3 c @@ -1340,7 +1349,35 @@ Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a b c1 c2 c3 d` Then the r a b c1 c2 c3 d c1 c2 c3 d -After found matching a b c1 c2 c3 d, the match process will try to discard all partial matches started before c1. There is one such match b c1 c2 c3 d. +After found matching a b c1 c2 c3 d, the match process will discard all partial matches started before c1. There is one such match b c1 c2 c3 d. + + + +To better understand the difference between NO_SKIP and SKIP_TO_NEXT take a look at following example: +Pattern: `a b+` and sequence: `a b1 b2 b3` Then the results will be: + + + + +Skip Strategy +Result + Description + + +NO_SKIP + +a b1 +a b1 b2 +a b1 b2 b3 + +After found matching a b1, the match process will not discard any result. + + +SKIP_TO_NEXT[b*] + +a b1 + +After found matching a b1, the match process will discard all partial matches started at a. This means neither a b1 b2 nor a b1 b2 b3 could be generated. @@ -1354,6 +1391,10 @@ To specify which skip strategy to use, just create an `AfterMatchSkipStrategy` b AfterMatchSkipStrategy.noSkip() Create a NO_SKIP skip strategy + +AfterMatchSkipStrategy.skipToNext() +Create a SKIP_TO_NEXT skip strategy + AfterMatchSkipStrategy.skipPastLastEvent() Create a SKIP_PAST_LAST_EVENT skip strategy diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java index f4448a35560..7578e29bbfa 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java @@ -38,7 +38,7 @@ private static final long serialVersionUID = -4048930333619068531L; /** -* Discards every partial match that contains event of the match preceding the first of *PatternName*. +* Discards every partial match that started before the first event of emitted match mapped to *PatternName*. * * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy @@ -48,7 +48,7 @@ public static SkipToFirstStrategy skipToFirst(String patternName) { } /** -* Discards every partial match that contains event of the match preceding the last of *PatternName*. +* Discards every partial match that started before the last event of emitted match mapped to *PatternName*. * * @param patternName the pattern name to skip to * @return the created AfterMatchSkipStrategy @@ -58,7 +58,7 @@ public static SkipToLastStrategy skipToLast(String patternName) { } /** -* Discards every partial match that contains event of the match. +* Discards every partial match that started before emitted match ended. * * @return the created AfterMatchSkipStrategy *
[jira] [Resolved] (FLINK-10414) Add skip to next strategy
[ https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dawid Wysakowicz resolved FLINK-10414. -- Resolution: Done > Add skip to next strategy > - > > Key: FLINK-10414 > URL: https://issues.apache.org/jira/browse/FLINK-10414 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > Add skip to next strategy, that should discard all partial matches that > started with the same element as found match. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/6733 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 94fc109c47b..59ab4065804 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -39,18 +40,21 @@ public class PackagedProgramUtils { /** -* Creates a {@link JobGraph} from the given {@link PackagedProgram}. +* Creates a {@link JobGraph} with a specified {@link JobID} +* from the given {@link PackagedProgram}. * * @param packagedProgram to extract the JobGraph from * @param configuration to use for the optimizer and job graph generator * @param defaultParallelism for the JobGraph +* @param jobID the pre-generated job id * @return JobGraph extracted from the PackagedProgram * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, - int defaultParallelism) throws ProgramInvocationException { + int defaultParallelism, + JobID jobID) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); final FlinkPlan flinkPlan; @@ -79,11 +83,11 @@ public static JobGraph createJobGraph( final JobGraph jobGraph; if (flinkPlan instanceof StreamingPlan) { - jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); } else { final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration); - jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan); + jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID); } for (URL url : packagedProgram.getAllLibraries()) { @@ -99,5 +103,22 @@ public static JobGraph createJobGraph( return jobGraph; } + /** +* Creates a {@link JobGraph} with a random {@link JobID} +* from the given {@link PackagedProgram}. +* +* @param packagedProgram to extract the JobGraph from +* @param configuration to use for the optimizer and job graph generator +* @param defaultParallelism for the JobGraph +* @return JobGraph extracted from the PackagedProgram +* @throws ProgramInvocationException if the JobGraph generation failed +*/ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism) throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null); + } + private PackagedProgramUtils() {} } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 3e0645d6859..d769b6848e7 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -18,6 +18,7 @@ package org.apache.flink.container.entrypoint; +import org.apache.flink.api.common.JobID; import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.client.program.PackagedProgramUtils; import org.apache.flink.client.program.ProgramInvo
[jira] [Commented] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633777#comment-16633777 ] ASF GitHub Bot commented on FLINK-10291: asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint URL: https://github.com/apache/flink/pull/6733 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java index 94fc109c47b..59ab4065804 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.client.program; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.Plan; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; @@ -39,18 +40,21 @@ public class PackagedProgramUtils { /** -* Creates a {@link JobGraph} from the given {@link PackagedProgram}. +* Creates a {@link JobGraph} with a specified {@link JobID} +* from the given {@link PackagedProgram}. * * @param packagedProgram to extract the JobGraph from * @param configuration to use for the optimizer and job graph generator * @param defaultParallelism for the JobGraph +* @param jobID the pre-generated job id * @return JobGraph extracted from the PackagedProgram * @throws ProgramInvocationException if the JobGraph generation failed */ public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, - int defaultParallelism) throws ProgramInvocationException { + int defaultParallelism, + JobID jobID) throws ProgramInvocationException { Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader()); final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration); final FlinkPlan flinkPlan; @@ -79,11 +83,11 @@ public static JobGraph createJobGraph( final JobGraph jobGraph; if (flinkPlan instanceof StreamingPlan) { - jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); + jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID); jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings()); } else { final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration); - jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan); + jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID); } for (URL url : packagedProgram.getAllLibraries()) { @@ -99,5 +103,22 @@ public static JobGraph createJobGraph( return jobGraph; } + /** +* Creates a {@link JobGraph} with a random {@link JobID} +* from the given {@link PackagedProgram}. +* +* @param packagedProgram to extract the JobGraph from +* @param configuration to use for the optimizer and job graph generator +* @param defaultParallelism for the JobGraph +* @return JobGraph extracted from the PackagedProgram +* @throws ProgramInvocationException if the JobGraph generation failed +*/ + public static JobGraph createJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + int defaultParallelism) throws ProgramInvocationException { + return createJobGraph(packagedProgram, configuration, defaultParallelism, null); + } + private PackagedProgramUtils() {} } diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java index 3e0645d6859..d769b6848e7 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java @@ -18,6 +18,7 @@ pa
[GitHub] twalthr commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink
twalthr commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink URL: https://github.com/apache/flink/pull/6769#issuecomment-425850646 It requires much than just switching to `StreamingFileSink`. Every new sink should integrate with the new unified API. See also https://github.com/apache/flink/pull/6611 for an example. The formats for should be pluggable and integrated with SQL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.
[ https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633782#comment-16633782 ] ASF GitHub Bot commented on FLINK-10447: twalthr commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink URL: https://github.com/apache/flink/pull/6769#issuecomment-425850646 It requires much than just switching to `StreamingFileSink`. Every new sink should integrate with the new unified API. See also https://github.com/apache/flink/pull/6611 for an example. The formats for should be pluggable and integrated with SQL. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Create Bucketing Table Sink. > > > Key: FLINK-10447 > URL: https://issues.apache.org/jira/browse/FLINK-10447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Suxing Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > It would be nice to integrate the table APIs with the HDFS connectors so that > the rows in the tables can be directly pushed into HDFS. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StefanRRichter commented on issue #6766: [docs] Improve documentation of savepoints
StefanRRichter commented on issue #6766: [docs] Improve documentation of savepoints URL: https://github.com/apache/flink/pull/6766#issuecomment-425851398 Thanks for the comments! Will merge. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] fhueske commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink
fhueske commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink URL: https://github.com/apache/flink/pull/6769#issuecomment-425851537 Please see also my comment on the Jira issue. Before adapting the PR, we should agree on the feature scope and design. Please provide a design document and continue the discussion on Jira issue. I think this PR should be closed because it needs to be completely rewritten. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.
[ https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633786#comment-16633786 ] ASF GitHub Bot commented on FLINK-10447: fhueske commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink URL: https://github.com/apache/flink/pull/6769#issuecomment-425851537 Please see also my comment on the Jira issue. Before adapting the PR, we should agree on the feature scope and design. Please provide a design document and continue the discussion on Jira issue. I think this PR should be closed because it needs to be completely rewritten. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Create Bucketing Table Sink. > > > Key: FLINK-10447 > URL: https://issues.apache.org/jira/browse/FLINK-10447 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Suxing Lee >Priority: Major > Labels: pull-request-available > Fix For: 1.7.0 > > > It would be nice to integrate the table APIs with the HDFS connectors so that > the rows in the tables can be directly pushed into HDFS. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] asfgit closed pull request #6766: [docs] Improve documentation of savepoints
asfgit closed pull request #6766: [docs] Improve documentation of savepoints URL: https://github.com/apache/flink/pull/6766 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md index 6dd5154c5e6..f21415ff73e 100644 --- a/docs/ops/state/savepoints.md +++ b/docs/ops/state/savepoints.md @@ -25,17 +25,29 @@ under the License. * toc {:toc} -## Overview +## What is a Savepoint? How is a Savepoint different from a Checkpoint? -Savepoints are externally stored self-contained checkpoints that you can use to stop-and-resume or update your Flink programs. They use Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html) to create a (non-incremental) snapshot of the state of your streaming program and write the checkpoint data and meta data out to an external file system. - -This page covers all steps involved in triggering, restoring, and disposing savepoints. -For more details on how Flink handles state and failures in general, check out the [State in Streaming Programs]({{ site.baseurl }}/dev/stream/state/index.html) page. +A Savepoint is a consistent image of the execution state of a streaming job, created via Flink's [checkpointing mechanism]({{ site.baseurl }}/internals/stream_checkpointing.html). You can use Savepoints to stop-and-resume, fork, +or update your Flink jobs. Savepoints consist of two parts: a directory with (typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a (relatively small) meta data file. The files on stable storage represent the net data of the job's execution state +image. The meta data file of a Savepoint contains (primarily) pointers to all file on stable storage that are part of the Savepoint, in form of absolute paths. Attention: In order to allow upgrades between programs and Flink versions, it is important to check out the following section about assigning IDs to your operators. +Flink's Savepoints are different from Checkpoints in a similar way that backups are different from recovery logs in traditional database systems. The primary purpose of Checkpoints is the provide a recovery mechanism in case of +unexpected job failures. A Checkpoint's lifecycle is managed by Flink, i.e. a Checkpoint is created, owned, and released by Flink - without user interaction. As a method of recovery and being periodically triggered, two main +design goals for the Checkpoint implementation are i) being as lightweight to create and ii) being as fast to restore from as possible. Optimizations towards those goals can exploit certain properties, e.g. that the job code +doesn't changes between the execution attempts. Checkpoints are usually dropped after the job was terminated by the user (except if explicitly configured as retained Checkpoints). + +In contrast to all this, Savepoints are created, owned, and deleted by the user. Their use-case is for planned, manual backup and resume. For example, this could be an update of your Flink version, changing your job graph, +changing parallelism, forking a second job like for a red/blue deployment, and so on. Of course, Savepoints must survive job termination. Conceptually, Savepoints can be a bit more expensive to produce and restore and focus +more on portability and support for the previously mentioned changes to the job. + +Those conceptual differences aside, the current implementations of Checkpoints and Savepoints are basically using the same code and produce the same „format". However, there is currently one exception from this, and we might +introduce more differences in the future. The exception are incremental checkpoints with the RocksDB state backend. They are using some RocksDB internal format instead of Flink’s native savepoint format. This makes them the +first instance of a more lightweight checkpointing mechanism, compared to Savepoints. + ## Assigning Operator IDs It is **highly recommended** that you adjust your programs as described in this section in order to be able to upgrade your programs in the future. The main required change is to manually specify operator IDs via the **`uid(String)`** method. These IDs are used to scope the state of each operator. @@ -211,4 +223,10 @@ If the savepoint was triggered with Flink >= 1.2.0 and using no deprecated state If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now deprecated APIs you first have to migrate your job and savepoint to Flink >= 1.2.0 before being able to change the parallelism. See the [upgrading jobs and Flink versions guide]({{ site.baseurl }}/ops/upgrading.html). +### Can I move the Savepoint files on stable storage? + +The quick answer to this que
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221550059 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Base class for {@link RequestBody} for running a jar or querying the plan. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class JarRequestBody implements RequestBody { Review comment: can be package private? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221552131 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsListQueryParameter.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameter; + +import java.io.File; + +/** + * Query parameter specifying the arguments list for the program. + * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...) + */ +public class ProgramArgsListQueryParameter extends StringQueryParameter { + public ProgramArgsListQueryParameter() { + super("program-args-list", MessageParameter.MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public String getDescription() { + return "List of string values that specify the arguments for the program or plan. " + + "It is recommended to specify it in a JSON request body as a JSON list parameter 'programArgsList'."; Review comment: Please remove this recommendation. You're baking in the assumption that the `ProgramArgsListQueryParameter ` is always used in conjunction with a `JarRequestBody` or similar. Deprecation notices for parameters belong into the respective `MessageHeader` description. User already receive a warning when using query parameters during the job submission. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221553160 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ## @@ -190,7 +195,8 @@ public void testConfigurationViaJsonRequest() throws Exception { () -> { final JarRunRequestBody jsonRequest = new JarRunRequestBody( ParameterProgram.class.getCanonicalName(), - "--host localhost --port 1234", + null, + Arrays.asList("--host", "localhost", "--port", "1234"), Review comment: this test should be duplicated instead to cover both cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221551267 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java ## @@ -63,4 +66,26 @@ return value; } + /** +* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value +* if it is not null, otherwise returns the default value. +*/ + public static T fromRequestBodyOrQueryParameter( + T requestValue, + SupplierWithException queryParameterExtractor, + T defaultValue, + Logger log) throws RestHandlerException { Review comment: indent parameters by another tab This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221552259 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ Review comment: add an empty line before the javadoc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221549560 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. + */ +abstract class JarMessageParameters extends MessageParameters { + + final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + + final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); + + final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); + + final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); + + final ProgramArgsListQueryParameter programArgsListQueryParameter = new ProgramArgsListQueryParameter(); + + @Override + public Collection> getPathParameters() { + return Collections.singletonList(jarIdPathParameter); + } + + @Override + public Collection> getQueryParameters() { + return new ArrayList<>(Arrays.asList( Review comment: This should continue to return an unmodifiable collection, otherwise all sub-classes have to wrap it again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r22149 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ## @@ -219,15 +225,40 @@ public void testConfigurationViaJsonRequest() throws Exception { } @Test - public void testParameterPrioritization() throws Exception { + public void testParameterPrioritizationWithProgArgsAsString() throws Exception { + testParameterPrioritization(PROG_ARGS, null); + } + + @Test + public void testParameterPrioritizationWithProgArgsAsList() throws Exception { + testParameterPrioritization(null, PROG_ARGS); + } + + @Test + public void testFailIfProgArgsAreAsStringAndAsList() throws Exception { + try { + testParameterPrioritization(PROG_ARGS, PROG_ARGS); + fail("IllegalArgumentException is excepted"); + } catch (IllegalArgumentException e) { + // expected + } + } + + private void testParameterPrioritization(List programArgs, List programArgsList) throws Exception { + List args = programArgs == null ? programArgsList : programArgs; Review comment: This code is way to confusing. It strikes me as very odd that `args` is never null, and the naming scheme is weird (args should be "argsList", "argsStr" should be "args" to be somewhat consistent with the method arguments). I would pass in a single collection as arguments and a simple enum for how arguments should be passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221550646 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ + public static List getProgramArgs( + HandlerRequest request, Logger log) throws RestHandlerException { + JarRequestBody requestBody = request.getRequestBody(); + List programArgs = tokenizeArguments( + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getProgramArguments()), + () -> getQueryParameter(request, ProgramArgsQueryParameter.class), + null, + log)); + List programArgsList = fromRequestBodyOrQueryParameter( + requestBody.getProgramArgumentsList(), + () -> request.getQueryParameter(ProgramArgsListQueryParameter.class), + null, + log); + if (requestBody.getProgramArgumentsList() != null) { Review comment: compare against `programArgsList` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221551766 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java ## @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() { @Override public String getDescription() { - return "String value that specifies the arguments for the program or plan."; + return "Deprecated, please, use 'programArgsList' in a JSON request body as a JSON list. " + Review comment: Revert this please. You're baking in the assumption that the `ProgramArgsQueryParameter` is always used in conjunction with a `JarRequestBody` or similar. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221553008 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ + public static List getProgramArgs( + HandlerRequest request, Logger log) throws RestHandlerException { + JarRequestBody requestBody = request.getRequestBody(); + List programArgs = tokenizeArguments( + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getProgramArguments()), + () -> getQueryParameter(request, ProgramArgsQueryParameter.class), + null, + log)); + List programArgsList = fromRequestBodyOrQueryParameter( Review comment: truly a minor thing, but might I suggest to move `fromRequestBodyOrQueryParameter(` to the next line so the indentation of this part is identical to the above? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221555894 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ + public static List getProgramArgs( + HandlerRequest request, Logger log) throws RestHandlerException { + JarRequestBody requestBody = request.getRequestBody(); + List programArgs = tokenizeArguments( + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getProgramArguments()), + () -> getQueryParameter(request, ProgramArgsQueryParameter.class), + null, + log)); + List programArgsList = fromRequestBodyOrQueryParameter( + requestBody.getProgramArgumentsList(), + () -> request.getQueryParameter(ProgramArgsListQueryParameter.class), + null, + log); + if (requestBody.getProgramArgumentsList() != null) { + if (!programArgs.isEmpty()) { + throw new IllegalArgumentException( Review comment: neither handler is handling this exception, so the user will get an INTERNAL_SERVER_ERROR instead of BAD_REQUEST. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221551094 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java ## @@ -33,10 +35,11 @@ } @Override - protected JarRunRequestBody getTestRequestInstance() throws Exception { + protected JarRunRequestBody getTestRequestInstance() { return new JarRunRequestBody( "hello", "world", + Arrays.asList("foo", "bar"), Review comment: please use 2 words that are not present in the existing argument. `foo` and `bar` are already use as the savepointPath. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221549739 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java ## @@ -29,32 +28,18 @@ /** * {@link MessageParameters} for {@link JarRunHandler}. */ -public class JarRunMessageParameters extends MessageParameters { +public class JarRunMessageParameters extends JarMessageParameters { - public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter(); - public final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); - - public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); - - public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); - - public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter(); - - public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter(); - - @Override - public Collection> getPathParameters() { - return Collections.singletonList(jarIdPathParameter); - } + final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter(); @Override public Collection> getQueryParameters() { - return Collections.unmodifiableCollection(Arrays.asList( - programArgsQueryParameter, - entryClassQueryParameter, - parallelismQueryParameter, + Collection> pars = super.getQueryParameters(); + pars.addAll(Arrays.asList( Review comment: Since you're creating a new list anyway and the total size is known, add the super parameter to that one instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API
zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221548569 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java ## @@ -80,13 +79,13 @@ public JarPlanHandler( @Override protected CompletableFuture handleRequest( - @Nonnull final HandlerRequest request, + @Nonnull final HandlerRequest request, @Nonnull final RestfulGateway gateway) throws RestHandlerException { final String jarId = request.getPathParameter(JarIdPathParameter.class); final String entryClass = emptyToNull(HandlerRequestUtils.getQueryParameter(request, EntryClassQueryParameter.class)); Review comment: Since the `JarPlanRequestBody` also contains this field (and is documented to accept it!) we should either a) reduce the `JarPlanRequestBody` to only contain the program arguments b) change this handler to also use `fromRequestBodyOrQueryParameter` c) exclude this handler from this change for now and handle it in a follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633814#comment-16633814 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221551094 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java ## @@ -33,10 +35,11 @@ } @Override - protected JarRunRequestBody getTestRequestInstance() throws Exception { + protected JarRunRequestBody getTestRequestInstance() { return new JarRunRequestBody( "hello", "world", + Arrays.asList("foo", "bar"), Review comment: please use 2 words that are not present in the existing argument. `foo` and `bar` are already use as the savepointPath. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633808#comment-16633808 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221550059 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java ## @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Base class for {@link RequestBody} for running a jar or querying the plan. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public abstract class JarRequestBody implements RequestBody { Review comment: can be package private? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633817#comment-16633817 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221555894 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ + public static List getProgramArgs( + HandlerRequest request, Logger log) throws RestHandlerException { + JarRequestBody requestBody = request.getRequestBody(); + List programArgs = tokenizeArguments( + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getProgramArguments()), + () -> getQueryParameter(request, ProgramArgsQueryParameter.class), + null, + log)); + List programArgsList = fromRequestBodyOrQueryParameter( + requestBody.getProgramArgumentsList(), + () -> request.getQueryParameter(ProgramArgsListQueryParameter.class), + null, + log); + if (requestBody.getProgramArgumentsList() != null) { + if (!programArgs.isEmpty()) { + throw new IllegalArgumentException( Review comment: neither handler is handling this exception, so the user will get an INTERNAL_SERVER_ERROR instead of BAD_REQUEST. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633811#comment-16633811 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221548569 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java ## @@ -80,13 +79,13 @@ public JarPlanHandler( @Override protected CompletableFuture handleRequest( - @Nonnull final HandlerRequest request, + @Nonnull final HandlerRequest request, @Nonnull final RestfulGateway gateway) throws RestHandlerException { final String jarId = request.getPathParameter(JarIdPathParameter.class); final String entryClass = emptyToNull(HandlerRequestUtils.getQueryParameter(request, EntryClassQueryParameter.class)); Review comment: Since the `JarPlanRequestBody` also contains this field (and is documented to accept it!) we should either a) reduce the `JarPlanRequestBody` to only contain the program arguments b) change this handler to also use `fromRequestBodyOrQueryParameter` c) exclude this handler from this change for now and handle it in a follow-up. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633812#comment-16633812 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221552131 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsListQueryParameter.java ## @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameter; + +import java.io.File; + +/** + * Query parameter specifying the arguments list for the program. + * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, String, String...) + */ +public class ProgramArgsListQueryParameter extends StringQueryParameter { + public ProgramArgsListQueryParameter() { + super("program-args-list", MessageParameter.MessageParameterRequisiteness.OPTIONAL); + } + + @Override + public String getDescription() { + return "List of string values that specify the arguments for the program or plan. " + + "It is recommended to specify it in a JSON request body as a JSON list parameter 'programArgsList'."; Review comment: Please remove this recommendation. You're baking in the assumption that the `ProgramArgsListQueryParameter ` is always used in conjunction with a `JarRequestBody` or similar. Deprecation notices for parameters belong into the respective `MessageHeader` description. User already receive a warning when using query parameters during the job submission. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633810#comment-16633810 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221551766 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java ## @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() { @Override public String getDescription() { - return "String value that specifies the arguments for the program or plan."; + return "Deprecated, please, use 'programArgsList' in a JSON request body as a JSON list. " + Review comment: Revert this please. You're baking in the assumption that the `ProgramArgsQueryParameter` is always used in conjunction with a `JarRequestBody` or similar. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633818#comment-16633818 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r22149 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ## @@ -219,15 +225,40 @@ public void testConfigurationViaJsonRequest() throws Exception { } @Test - public void testParameterPrioritization() throws Exception { + public void testParameterPrioritizationWithProgArgsAsString() throws Exception { + testParameterPrioritization(PROG_ARGS, null); + } + + @Test + public void testParameterPrioritizationWithProgArgsAsList() throws Exception { + testParameterPrioritization(null, PROG_ARGS); + } + + @Test + public void testFailIfProgArgsAreAsStringAndAsList() throws Exception { + try { + testParameterPrioritization(PROG_ARGS, PROG_ARGS); + fail("IllegalArgumentException is excepted"); + } catch (IllegalArgumentException e) { + // expected + } + } + + private void testParameterPrioritization(List programArgs, List programArgsList) throws Exception { + List args = programArgs == null ? programArgsList : programArgs; Review comment: This code is way to confusing. It strikes me as very odd that `args` is never null, and the naming scheme is weird (args should be "argsList", "argsStr" should be "args" to be somewhat consistent with the method arguments). I would pass in a single collection as arguments and a simple enum for how arguments should be passed. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633815#comment-16633815 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221550646 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ + public static List getProgramArgs( + HandlerRequest request, Logger log) throws RestHandlerException { + JarRequestBody requestBody = request.getRequestBody(); + List programArgs = tokenizeArguments( + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getProgramArguments()), + () -> getQueryParameter(request, ProgramArgsQueryParameter.class), + null, + log)); + List programArgsList = fromRequestBodyOrQueryParameter( + requestBody.getProgramArgumentsList(), + () -> request.getQueryParameter(ProgramArgsListQueryParameter.class), + null, + log); + if (requestBody.getProgramArgumentsList() != null) { Review comment: compare against `programArgsList` instead? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633816#comment-16633816 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221553160 ## File path: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java ## @@ -190,7 +195,8 @@ public void testConfigurationViaJsonRequest() throws Exception { () -> { final JarRunRequestBody jsonRequest = new JarRunRequestBody( ParameterProgram.class.getCanonicalName(), - "--host localhost --port 1234", + null, + Arrays.asList("--host", "localhost", "--port", "1234"), Review comment: this test should be duplicated instead to cover both cases. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633807#comment-16633807 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221551267 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java ## @@ -63,4 +66,26 @@ return value; } + /** +* Returns {@code requestValue} if it is not null, otherwise returns the query parameter value +* if it is not null, otherwise returns the default value. +*/ + public static T fromRequestBodyOrQueryParameter( + T requestValue, + SupplierWithException queryParameterExtractor, + T defaultValue, + Logger log) throws RestHandlerException { Review comment: indent parameters by another tab This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633813#comment-16633813 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221553008 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ + public static List getProgramArgs( + HandlerRequest request, Logger log) throws RestHandlerException { + JarRequestBody requestBody = request.getRequestBody(); + List programArgs = tokenizeArguments( + fromRequestBodyOrQueryParameter( + emptyToNull(requestBody.getProgramArguments()), + () -> getQueryParameter(request, ProgramArgsQueryParameter.class), + null, + log)); + List programArgsList = fromRequestBodyOrQueryParameter( Review comment: truly a minor thing, but might I suggest to move `fromRequestBodyOrQueryParameter(` to the next line so the indentation of this part is identical to the above? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633806#comment-16633806 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221549739 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java ## @@ -29,32 +28,18 @@ /** * {@link MessageParameters} for {@link JarRunHandler}. */ -public class JarRunMessageParameters extends MessageParameters { +public class JarRunMessageParameters extends JarMessageParameters { - public final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter(); - public final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); - - public final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); - - public final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); - - public final AllowNonRestoredStateQueryParameter allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter(); - - public final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter(); - - @Override - public Collection> getPathParameters() { - return Collections.singletonList(jarIdPathParameter); - } + final SavepointPathQueryParameter savepointPathQueryParameter = new SavepointPathQueryParameter(); @Override public Collection> getQueryParameters() { - return Collections.unmodifiableCollection(Arrays.asList( - programArgsQueryParameter, - entryClassQueryParameter, - parallelismQueryParameter, + Collection> pars = super.getQueryParameters(); + pars.addAll(Arrays.asList( Review comment: Since you're creating a new list anyway and the total size is known, add the super parameter to that one instead. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633805#comment-16633805 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221549560 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java ## @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.MessageParameters; +import org.apache.flink.runtime.rest.messages.MessagePathParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +/** + * Base class of {@link MessageParameters} for {@link JarRunHandler} and {@link JarPlanHandler}. + */ +abstract class JarMessageParameters extends MessageParameters { + + final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter(); + + final EntryClassQueryParameter entryClassQueryParameter = new EntryClassQueryParameter(); + + final ParallelismQueryParameter parallelismQueryParameter = new ParallelismQueryParameter(); + + final ProgramArgsQueryParameter programArgsQueryParameter = new ProgramArgsQueryParameter(); + + final ProgramArgsListQueryParameter programArgsListQueryParameter = new ProgramArgsListQueryParameter(); + + @Override + public Collection> getPathParameters() { + return Collections.singletonList(jarIdPathParameter); + } + + @Override + public Collection> getQueryParameters() { + return new ArrayList<>(Arrays.asList( Review comment: This should continue to return an unmodifiable collection, otherwise all sub-classes have to wrap it again. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results
[ https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633809#comment-16633809 ] ASF GitHub Bot commented on FLINK-10295: zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API URL: https://github.com/apache/flink/pull/6754#discussion_r221552259 ## File path: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java ## @@ -26,13 +36,42 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter; +import static org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter; +import static org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull; + /** * Utils for jar handlers. * * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler */ public class JarHandlerUtils { + /** Parse program arguments in jar run or plan request. */ Review comment: add an empty line before the javadoc This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Tokenisation of Program Args resulting in unexpected results > > > Key: FLINK-10295 > URL: https://issues.apache.org/jira/browse/FLINK-10295 > Project: Flink > Issue Type: Bug > Components: REST, Webfrontend >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Gaurav Singhania >Assignee: Andrey Zagrebin >Priority: Blocker > Labels: pull-request-available > Fix For: 1.7.0 > > Attachments: sample_request.txt > > > We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes > all the details to run the job as program args against a jarid, including sql > query and kafka details. In version 1.5 the program args are tokenised as a > result single quote (') and double quote(") are stripped from the arguments. > This results in malformed args. > Attached a sample request for reference. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool
NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index c235999db91..48b9a20e9da 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int size, Object owner) { return new HybridMemorySegment(new byte[size], owner); } + /** +* Allocates some unpooled off-heap memory and creates a new memory segment that +* represents that memory. +* +* @param size The size of the off-heap memory segment to allocate. +* @param owner The owner to associate with the off-heap memory segment. +* @return A new memory segment, backed by unpooled off-heap memory. +*/ + public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) { + ByteBuffer memory = ByteBuffer.allocateDirect(size); + return wrapPooledOffHeapMemory(memory, owner); + } + /** * Creates a memory segment that wraps the given byte array. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index a369ce5a5fb..1fddb612781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -30,7 +30,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { try { for (int i = 0; i < numberOfSegmentsToAllocate; i++) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); + availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null)); } } catch (OutOfMemoryError err) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 2a6a71f05d6..f941e20846e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -257,7 +257,8 @@ public String toString() { synchronized (buffers) { for (int i = 0; i < numberOfBuffers; i++) { - buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this)); + buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory( + memorySegmentSize, null), this)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index c450880f98b..d1a304a4909 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -675,8 +675,7 @@ void clear() { @Override MemorySegment allocateNewSegment(Object owner) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - return MemorySegmentFactory.wrapPooledOffHeapMemory(memory, owner); + return MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner); } @Override This
[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633823#comment-16633823 ] ASF GitHub Bot commented on FLINK-10339: NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool URL: https://github.com/apache/flink/pull/6762 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java index c235999db91..48b9a20e9da 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java @@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int size, Object owner) { return new HybridMemorySegment(new byte[size], owner); } + /** +* Allocates some unpooled off-heap memory and creates a new memory segment that +* represents that memory. +* +* @param size The size of the off-heap memory segment to allocate. +* @param owner The owner to associate with the off-heap memory segment. +* @return A new memory segment, backed by unpooled off-heap memory. +*/ + public static MemorySegment allocateUnpooledOffHeapMemory(int size, Object owner) { + ByteBuffer memory = ByteBuffer.allocateDirect(size); + return wrapPooledOffHeapMemory(memory, owner); + } + /** * Creates a memory segment that wraps the given byte array. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java index a369ce5a5fb..1fddb612781 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java @@ -30,7 +30,6 @@ import javax.annotation.Nullable; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashSet; import java.util.List; @@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int segmentSize) { try { for (int i = 0; i < numberOfSegmentsToAllocate; i++) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory, null)); + availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, null)); } } catch (OutOfMemoryError err) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java index 2a6a71f05d6..f941e20846e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java @@ -257,7 +257,8 @@ public String toString() { synchronized (buffers) { for (int i = 0; i < numberOfBuffers; i++) { - buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), this)); + buffers.add(new NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory( + memorySegmentSize, null), this)); } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java index c450880f98b..d1a304a4909 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java @@ -675,8 +675,7 @@ void clear() { @Override MemorySegment allocateNewSegment(Object owner) { - ByteBuffer memory = ByteBuffer.allocateDirect(segmentSize); - return MemorySegmentFactory.wrapP
[jira] [Resolved] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory
[ https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber resolved FLINK-10339. - Resolution: Fixed Fix Version/s: 1.7.0 Fixed via: - master: 2ab55df946dc35280f66c6f073a112b497acede3 > SpillReadBufferPool cannot use off-heap memory > -- > > Key: FLINK-10339 > URL: https://issues.apache.org/jira/browse/FLINK-10339 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > Labels: pull-request-available > Fix For: 1.7.0 > > > Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce > memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during > transporting on sender side. > > But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses > heap memory for caching. We can make it as off-heap by default similar with > {{NetworkBufferPool}} or decide the type by the current parameter > {{taskmanager.memory.off-heap.}} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] dawidwys commented on issue #6781: [FLINK-10470] Add method to check if pattern can produce empty matches
dawidwys commented on issue #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781#issuecomment-425868325 cc @kl0u @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633854#comment-16633854 ] ASF GitHub Bot commented on FLINK-10470: dawidwys commented on issue #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781#issuecomment-425868325 cc @kl0u @pnowojski This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add method to check if pattern can produce empty matches > > > Key: FLINK-10470 > URL: https://issues.apache.org/jira/browse/FLINK-10470 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > There is couple of inconsistencies how CEP library handles greedy and > reluctant operators at the beginning at end of pattern. This results in > subtle problems how empty matches should be generated for patterns like e.g. > A? or A*?, where one is greedy and the other one is reluctant. In order to > provide first version of MATCH_RECOGNIZE function we should have a > possibility to disable patterns which can produce empty matches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r221573282 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.fail; + +/** + * The test class for metrics report by job manager. + */ +public class JobManagerMetricsTest { Review comment: This isn't testing anything. What we want to ensure is that these metrics don't disappear again, and optionally that the values are correct. For this you have to actually start runtime components, which could be done with a `MiniCluster` (that is also configured to use the TestReporter) and have the reporter search for the desired metrics. The `SystemResourcesMetricsITCase` may be a useful reference This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics
zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r221573282 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.fail; + +/** + * The test class for metrics report by job manager. + */ +public class JobManagerMetricsTest { Review comment: This isn't testing anything. What we want to ensure is that these metrics don't disappear again, and optionally that the values are correct. For this you have to actually start runtime components, which could be done with a `MiniCluster` (that is also configured to use the TestReporter) and have the reporter search for the desired metrics. The `SystemResourcesMetricsITCase` may be a useful reference. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633862#comment-16633862 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r221573282 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.fail; + +/** + * The test class for metrics report by job manager. + */ +public class JobManagerMetricsTest { Review comment: This isn't testing anything. What we want to ensure is that these metrics don't disappear again, and optionally that the values are correct. For this you have to actually start runtime components, which could be done with a `MiniCluster` (that is also configured to use the TestReporter) and have the reporter search for the desired metrics. The `SystemResourcesMetricsITCase` may be a useful reference This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics
[ https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633863#comment-16633863 ] ASF GitHub Bot commented on FLINK-10135: zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics URL: https://github.com/apache/flink/pull/6702#discussion_r221573282 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java ## @@ -0,0 +1,134 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.metrics; + +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.MetricConfig; +import org.apache.flink.metrics.reporter.AbstractReporter; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; + +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; + +import static org.junit.Assert.fail; + +/** + * The test class for metrics report by job manager. + */ +public class JobManagerMetricsTest { Review comment: This isn't testing anything. What we want to ensure is that these metrics don't disappear again, and optionally that the values are correct. For this you have to actually start runtime components, which could be done with a `MiniCluster` (that is also configured to use the TestReporter) and have the reporter search for the desired metrics. The `SystemResourcesMetricsITCase` may be a useful reference. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > The JobManager doesn't report the cluster-level metrics > --- > > Key: FLINK-10135 > URL: https://issues.apache.org/jira/browse/FLINK-10135 > Project: Flink > Issue Type: Bug > Components: JobManager, Metrics >Affects Versions: 1.5.0, 1.6.0, 1.7.0 >Reporter: Joey Echeverria >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > > In [the documentation for > metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster] > in the Flink 1.5.0 release, it says that the following metrics are reported > by the JobManager: > {noformat} > numRegisteredTaskManagers > numRunningJobs > taskSlotsAvailable > taskSlotsTotal > {noformat} > In the job manager REST endpoint > ({{http://:8081/jobmanager/metrics}}), those metrics don't > appear. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches
pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781#discussion_r221576741 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ## @@ -74,6 +77,36 @@ } } + /** +* Verifies if the provided pattern can possibly generate empty match. Example of patterns that can possibly +* generate empty matches are: A*, A?, A* B? etc. +* +* @param pattern pattern to check +* @return true if empty match could potentially match the pattern, false otherwise +*/ + public static boolean canProduceEmptyMatches(final Pattern pattern) { + NFAFactoryCompiler compiler = new NFAFactoryCompiler<>(checkNotNull(pattern)); + compiler.compileFactory(); + State startState = compiler.getStates().stream().filter(State::isStart).findFirst().get(); Review comment: nit: `orElseThrow(IllegalStateException::new)` (or sth like this, I'm not sure what's the correct syntax) instead of `get()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches
pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781#discussion_r221577049 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ## @@ -74,6 +77,36 @@ } } + /** +* Verifies if the provided pattern can possibly generate empty match. Example of patterns that can possibly +* generate empty matches are: A*, A?, A* B? etc. +* +* @param pattern pattern to check +* @return true if empty match could potentially match the pattern, false otherwise +*/ + public static boolean canProduceEmptyMatches(final Pattern pattern) { + NFAFactoryCompiler compiler = new NFAFactoryCompiler<>(checkNotNull(pattern)); + compiler.compileFactory(); + State startState = compiler.getStates().stream().filter(State::isStart).findFirst().get(); + + final Stack> statesToCheck = new Stack<>(); + statesToCheck.push(startState); + while (!statesToCheck.isEmpty()) { + final State currentState = statesToCheck.pop(); + for (StateTransition transition : currentState.getStateTransitions()) { + if (transition.getAction() == StateTransitionAction.PROCEED) { + if (transition.getTargetState().isFinal()) { + return true; + } else { + statesToCheck.push(transition.getTargetState()); Review comment: Can we loop here forever if `PROCEED` takes us to already visited node? Shouldn't we check for already visited states? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633874#comment-16633874 ] ASF GitHub Bot commented on FLINK-10470: pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781#discussion_r221577049 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ## @@ -74,6 +77,36 @@ } } + /** +* Verifies if the provided pattern can possibly generate empty match. Example of patterns that can possibly +* generate empty matches are: A*, A?, A* B? etc. +* +* @param pattern pattern to check +* @return true if empty match could potentially match the pattern, false otherwise +*/ + public static boolean canProduceEmptyMatches(final Pattern pattern) { + NFAFactoryCompiler compiler = new NFAFactoryCompiler<>(checkNotNull(pattern)); + compiler.compileFactory(); + State startState = compiler.getStates().stream().filter(State::isStart).findFirst().get(); + + final Stack> statesToCheck = new Stack<>(); + statesToCheck.push(startState); + while (!statesToCheck.isEmpty()) { + final State currentState = statesToCheck.pop(); + for (StateTransition transition : currentState.getStateTransitions()) { + if (transition.getAction() == StateTransitionAction.PROCEED) { + if (transition.getTargetState().isFinal()) { + return true; + } else { + statesToCheck.push(transition.getTargetState()); Review comment: Can we loop here forever if `PROCEED` takes us to already visited node? Shouldn't we check for already visited states? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add method to check if pattern can produce empty matches > > > Key: FLINK-10470 > URL: https://issues.apache.org/jira/browse/FLINK-10470 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > There is couple of inconsistencies how CEP library handles greedy and > reluctant operators at the beginning at end of pattern. This results in > subtle problems how empty matches should be generated for patterns like e.g. > A? or A*?, where one is greedy and the other one is reluctant. In order to > provide first version of MATCH_RECOGNIZE function we should have a > possibility to disable patterns which can produce empty matches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reopened FLINK-10291: --- > Generate JobGraph with fixed/configurable JobID in > StandaloneJobClusterEntrypoint > - > > Key: FLINK-10291 > URL: https://issues.apache.org/jira/browse/FLINK-10291 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} > from the user code when being started. Due to the nature of how the > {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is > problematic in case of a failover because then, the {{JobMaster}} won't be > able to detect the checkpoints. In order to solve this problem, we need to > either fix the {{JobID}} assignment or make it configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann resolved FLINK-10291. --- Resolution: Fixed Fixed via 1.7.0: c839b1a04c389db816fad0194eb63a5220ee5351 1.6.2: 3ee66af6b1c42eda07d0a28d71560cdd3b4dba33 > Generate JobGraph with fixed/configurable JobID in > StandaloneJobClusterEntrypoint > - > > Key: FLINK-10291 > URL: https://issues.apache.org/jira/browse/FLINK-10291 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} > from the user code when being started. Due to the nature of how the > {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is > problematic in case of a failover because then, the {{JobMaster}} won't be > able to detect the checkpoints. In order to solve this problem, we need to > either fix the {{JobID}} assignment or make it configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches
[ https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633875#comment-16633875 ] ASF GitHub Bot commented on FLINK-10470: pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches URL: https://github.com/apache/flink/pull/6781#discussion_r221576741 ## File path: flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java ## @@ -74,6 +77,36 @@ } } + /** +* Verifies if the provided pattern can possibly generate empty match. Example of patterns that can possibly +* generate empty matches are: A*, A?, A* B? etc. +* +* @param pattern pattern to check +* @return true if empty match could potentially match the pattern, false otherwise +*/ + public static boolean canProduceEmptyMatches(final Pattern pattern) { + NFAFactoryCompiler compiler = new NFAFactoryCompiler<>(checkNotNull(pattern)); + compiler.compileFactory(); + State startState = compiler.getStates().stream().filter(State::isStart).findFirst().get(); Review comment: nit: `orElseThrow(IllegalStateException::new)` (or sth like this, I'm not sure what's the correct syntax) instead of `get()`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add method to check if pattern can produce empty matches > > > Key: FLINK-10470 > URL: https://issues.apache.org/jira/browse/FLINK-10470 > Project: Flink > Issue Type: Sub-task > Components: CEP >Reporter: Dawid Wysakowicz >Assignee: Dawid Wysakowicz >Priority: Major > Labels: pull-request-available > > There is couple of inconsistencies how CEP library handles greedy and > reluctant operators at the beginning at end of pattern. This results in > subtle problems how empty matches should be generated for patterns like e.g. > A? or A*?, where one is greedy and the other one is reluctant. In order to > provide first version of MATCH_RECOGNIZE function we should have a > possibility to disable patterns which can produce empty matches. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint
[ https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann closed FLINK-10291. - Resolution: Fixed Release Note: The {{StandaloneJobClusterEntrypoint}} now starts all jobs with a fixed {{JobID}}. Thus, in order to run a cluster in HA mode, one needs to set a different cluster id {{high-availability.cluster-id}} for each job/cluster. > Generate JobGraph with fixed/configurable JobID in > StandaloneJobClusterEntrypoint > - > > Key: FLINK-10291 > URL: https://issues.apache.org/jira/browse/FLINK-10291 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Affects Versions: 1.6.0, 1.7.0 >Reporter: Till Rohrmann >Assignee: vinoyang >Priority: Critical > Labels: pull-request-available > Fix For: 1.7.0, 1.6.2 > > > The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} > from the user code when being started. Due to the nature of how the > {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is > problematic in case of a failover because then, the {{JobMaster}} won't be > able to detect the checkpoints. In order to solve this problem, we need to > either fix the {{JobID}} assignment or make it configurable. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10462) Remove ConnectionIndex for further sharing tcp connection in credit-based mode
[ https://issues.apache.org/jira/browse/FLINK-10462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633879#comment-16633879 ] Nico Kruber commented on FLINK-10462: - Yes, you are right about the changes that credit-based flow control allows here. I have been thinking about this as well and have also been asked at Flink Forward, but I have mixed feelings: sometimes it is actually beneficial to have multiple TCP connections if, for example, network is the bottleneck. Only through multiple connections, you will be able to saturate a connection, judging from various network hardware tests I have read in the past. This especially happens the more throughput the interface offers. Have you made different observations? On the other hand, opening too many connections (happens easily for large topologies) may also become a problem. How about a more generic connection pool with configurable size? This wouldn't be an easy patch as the ticket described here and we'd also only need to look into this once we really have the problem with too many connections, or do you see any other advantages of changing the behaviour here? > Remove ConnectionIndex for further sharing tcp connection in credit-based > mode > --- > > Key: FLINK-10462 > URL: https://issues.apache.org/jira/browse/FLINK-10462 > Project: Flink > Issue Type: Improvement > Components: Network >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.5.4 >Reporter: zhijiang >Assignee: zhijiang >Priority: Minor > > Every {{IntermediateResult}} generates a random {{ConnectionIndex}} which > will be included in {{ConnectionID}}. > The {{RemoteInputChannel}} requests to establish tcp connection via > {{ConnectionID}}. That means one tcp connection may be shared by multiple > {{RemoteInputChannel}} {{s which have the same ConnectionID}}. To do so, we > can reduce the physical connections between two \{{TaskManager}} s, and it > brings benefits for large scale jobs. > But this sharing is limited only for the same {{IntermediateResult}}, and I > think it is mainly because we may temporarily switch off {{autoread}} for the > channel during back pressure in previous network flow control. For > credit-based mode, the channel is always open for transporting different > intermediate data, so we can further share the tcp connection for different > {{IntermediateResults}} to remove the limit. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo URL: https://github.com/apache/flink/pull/6735#discussion_r221575216 ## File path: flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java ## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.connectors.cassandra; + +import com.datastax.driver.mapping.annotations.Column; +import com.datastax.driver.mapping.annotations.Table; + +/** + * Example of Cassandra Annotated POJO class for use with {@link CassandraPojoInputFormat}. + */ +@Table(name = "batches", keyspace = "flink") +public class CustomCassandraAnnotatedPojo { + + Review comment: double empty-lines should be prohibited by checkstyle, don't know why this isn't failing :/ This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services