[GitHub] twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

2018-10-01 Thread GitBox
twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] 
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r221513851
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ##
 @@ -96,6 +96,6 @@ class StreamTableDescriptor(
 
 // this performs only basic validation
 // more validation can only happen within a factory
-new StreamTableDescriptorValidator().validate(properties)
+new StreamTableDescriptorValidator(true, true, true).validate(properties)
 
 Review comment:
   I think we can also remove it. As the comment suggests it doesn't check much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

2018-10-01 Thread GitBox
twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] 
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r221513851
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ##
 @@ -96,6 +96,6 @@ class StreamTableDescriptor(
 
 // this performs only basic validation
 // more validation can only happen within a factory
-new StreamTableDescriptorValidator().validate(properties)
+new StreamTableDescriptorValidator(true, true, true).validate(properties)
 
 Review comment:
   I think we can also remove it. As the comment suggests it doesn't check 
much. Update modes are optional for custom factories anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633670#comment-16633670
 ] 

ASF GitHub Bot commented on FLINK-3875:
---

twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] 
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r221513851
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ##
 @@ -96,6 +96,6 @@ class StreamTableDescriptor(
 
 // this performs only basic validation
 // more validation can only happen within a factory
-new StreamTableDescriptorValidator().validate(properties)
+new StreamTableDescriptorValidator(true, true, true).validate(properties)
 
 Review comment:
   I think we can also remove it. As the comment suggests it doesn't check much.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633672#comment-16633672
 ] 

ASF GitHub Bot commented on FLINK-3875:
---

twalthr commented on a change in pull request #6611: [FLINK-3875] [connectors] 
Add an upsert table sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#discussion_r221513851
 
 

 ##
 File path: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/StreamTableDescriptor.scala
 ##
 @@ -96,6 +96,6 @@ class StreamTableDescriptor(
 
 // this performs only basic validation
 // more validation can only happen within a factory
-new StreamTableDescriptorValidator().validate(properties)
+new StreamTableDescriptorValidator(true, true, true).validate(properties)
 
 Review comment:
   I think we can also remove it. As the comment suggests it doesn't check 
much. Update modes are optional for custom factories anyway.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-10-01 Thread GitBox
twalthr commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r221515643
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
+ratesHistoryData.add(Tuple2.of("Euro", 114L));
+ratesHistoryData.add(Tuple2.of("Yen", 1L));
+ratesHistoryData.add(Tuple2.of("Euro", 116L));
+ratesHistoryData.add(Tuple2.of("Euro", 119L));
+
+DataStreamSource> ordersStream = 
env

[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633674#comment-16633674
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

twalthr commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r221515643
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tup

[jira] [Commented] (FLINK-10464) TimeIndicatorRelDataType: digest can not describe a type completely.

2018-10-01 Thread Timo Walther (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10464?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633676#comment-16633676
 ] 

Timo Walther commented on FLINK-10464:
--

[~huangjiatian] could you add a more complete example of your table program to 
reproduce your error. This issue might be a duplicate of FLINK-8897. The file 
that you posted is not visible.

> TimeIndicatorRelDataType: digest can not describe a type completely.
> 
>
> Key: FLINK-10464
> URL: https://issues.apache.org/jira/browse/FLINK-10464
> Project: Flink
>  Issue Type: Bug
>Reporter: huangjiatian
>Priority: Minor
>
> I met a strange question when i use Flink SQL API.
> The error message like that: 
> java.lang.AssertionError: Conversion to relational algebra failed to preserve 
> datatypes:
> validated type:
> RecordType(TIMESTAMP(3) NOT NULL rowtime) NOT NULL
> converted type:
> RecordType(TIMESTAMP(3) NOT NULL rowtime) NOT NULL
> rel:
> LogicalProject(rowtime=[$3])
>   LogicalTableScan(table=[[hjtsrc]])
>     
> I found two difference type are considered equal.
> !image-2018-09-28-13-11-43-515.png!
> that mean, "select deviceid, rowtime.rowtime" equal to "select deviceid, 
> rowtime.proctime"
> "digest" in TimeIndicatorRelDataType without event time message , it can not 
> describe a TimeIndicatorRelDataType completely.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw exception on pattern variable mis…

2018-10-01 Thread GitBox
dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw 
exception on pattern variable mis…
URL: https://github.com/apache/flink/pull/6758
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ad321bf71b5..d7f915fc62e 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1385,6 +1385,23 @@ Pattern.begin("patternName", skipStrategy)
 
 
 
+{% warn Attention %} For SKIP_TO_FIRST/LAST there are two options how to 
handle cases when there are no elements mapped to
+the specified variable. By default a NO_SKIP strategy will be used in this 
case. The other option is to throw exception in such situation.
+One can enable this option by:
+
+
+
+{% highlight java %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+
+
+{% highlight scala %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+
+
+
 ## Detecting Patterns
 
 After specifying the pattern sequence you are looking for, it is time to apply 
it to your input stream to detect
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index 8151a124af4..f4448a35560 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -43,8 +43,8 @@
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy skipToFirst(String patternName) {
-   return new SkipToFirstStrategy(patternName);
+   public static SkipToFirstStrategy skipToFirst(String patternName) {
+   return new SkipToFirstStrategy(patternName, false);
}
 
/**
@@ -53,8 +53,8 @@ public static AfterMatchSkipStrategy skipToFirst(String 
patternName) {
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy skipToLast(String patternName) {
-   return new SkipToLastStrategy(patternName);
+   public static SkipToLastStrategy skipToLast(String patternName) {
+   return new SkipToLastStrategy(patternName, false);
}
 
/**
@@ -62,7 +62,7 @@ public static AfterMatchSkipStrategy skipToLast(String 
patternName) {
 *
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy skipPastLastEvent() {
+   public static SkipPastLastStrategy skipPastLastEvent() {
return SkipPastLastStrategy.INSTANCE;
}
 
@@ -71,7 +71,7 @@ public static AfterMatchSkipStrategy skipPastLastEvent() {
 *
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy noSkip() {
+   public static NoSkipStrategy noSkip() {
return NoSkipStrategy.INSTANCE;
}
 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
new file mode 100644
index 000..5554151ccbd
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.nfa.aftermatch;
+
+import org.apache.flink.cep.nfa.sharedbuffer.EventId;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+
+import static o

[jira] [Commented] (FLINK-10417) Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633679#comment-16633679
 ] 

ASF GitHub Bot commented on FLINK-10417:


dawidwys closed pull request #6758: [FLINK-10417][cep] Added option to throw 
exception on pattern variable mis…
URL: https://github.com/apache/flink/pull/6758
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index ad321bf71b5..d7f915fc62e 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1385,6 +1385,23 @@ Pattern.begin("patternName", skipStrategy)
 
 
 
+{% warn Attention %} For SKIP_TO_FIRST/LAST there are two options how to 
handle cases when there are no elements mapped to
+the specified variable. By default a NO_SKIP strategy will be used in this 
case. The other option is to throw exception in such situation.
+One can enable this option by:
+
+
+
+{% highlight java %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+
+
+{% highlight scala %}
+AfterMatchSkipStrategy.skipToFirst(patternName).throwExceptionOnMiss()
+{% endhighlight %}
+
+
+
 ## Detecting Patterns
 
 After specifying the pattern sequence you are looking for, it is time to apply 
it to your input stream to detect
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index 8151a124af4..f4448a35560 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -43,8 +43,8 @@
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy skipToFirst(String patternName) {
-   return new SkipToFirstStrategy(patternName);
+   public static SkipToFirstStrategy skipToFirst(String patternName) {
+   return new SkipToFirstStrategy(patternName, false);
}
 
/**
@@ -53,8 +53,8 @@ public static AfterMatchSkipStrategy skipToFirst(String 
patternName) {
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy skipToLast(String patternName) {
-   return new SkipToLastStrategy(patternName);
+   public static SkipToLastStrategy skipToLast(String patternName) {
+   return new SkipToLastStrategy(patternName, false);
}
 
/**
@@ -62,7 +62,7 @@ public static AfterMatchSkipStrategy skipToLast(String 
patternName) {
 *
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy skipPastLastEvent() {
+   public static SkipPastLastStrategy skipPastLastEvent() {
return SkipPastLastStrategy.INSTANCE;
}
 
@@ -71,7 +71,7 @@ public static AfterMatchSkipStrategy skipPastLastEvent() {
 *
 * @return the created AfterMatchSkipStrategy
 */
-   public static AfterMatchSkipStrategy noSkip() {
+   public static NoSkipStrategy noSkip() {
return NoSkipStrategy.INSTANCE;
}
 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
new file mode 100644
index 000..5554151ccbd
--- /dev/null
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/SkipToElementStrategy.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.f

[jira] [Resolved] (FLINK-10417) Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST

2018-10-01 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-10417.
--
Resolution: Done

> Add option to throw exception on pattern variable miss with SKIP_TO_FIRST/LAST
> --
>
> Key: FLINK-10417
> URL: https://issues.apache.org/jira/browse/FLINK-10417
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread Dawid Wysakowicz (JIRA)
Dawid Wysakowicz created FLINK-10470:


 Summary: Add method to check if pattern can produce empty matches
 Key: FLINK-10470
 URL: https://issues.apache.org/jira/browse/FLINK-10470
 Project: Flink
  Issue Type: Sub-task
  Components: CEP
Reporter: Dawid Wysakowicz
Assignee: Dawid Wysakowicz






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz updated FLINK-10470:
-
Description: There is couple of inconsistencies how CEP library handles 
greedy and reluctant operators at the beginning at end of pattern. This results 
in subtle problems how empty matches should be generated for patterns like e.g. 
A? or A*?, where one is greedy and the other one is reluctant. In order to 
provide first version of MATCH_RECOGNIZE function we should have a possibility 
to disable patterns which can produce empty matches.

> Add method to check if pattern can produce empty matches
> 
>
> Key: FLINK-10470
> URL: https://issues.apache.org/jira/browse/FLINK-10470
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> There is couple of inconsistencies how CEP library handles greedy and 
> reluctant operators at the beginning at end of pattern. This results in 
> subtle problems how empty matches should be generated for patterns like e.g. 
> A? or A*?, where one is greedy and the other one is reluctant. In order to 
> provide first version of MATCH_RECOGNIZE function we should have a 
> possibility to disable patterns which can produce empty matches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys opened a new pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches

2018-10-01 Thread GitBox
dawidwys opened a new pull request #6781: [FLINK-10470] Add method to check if 
pattern can produce empty matches
URL: https://github.com/apache/flink/pull/6781
 
 
   ## What is the purpose of the change
   
   This change introduces a method that allows checking if a Pattern can 
produce empty matches. It is needed to disable such patterns in 
MATCH_RECOGNIZE, until we are able to provide a full support of empty matches 
in CEP (with proper support of reluctant and greedy)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633712#comment-16633712
 ] 

ASF GitHub Bot commented on FLINK-10470:


dawidwys opened a new pull request #6781: [FLINK-10470] Add method to check if 
pattern can produce empty matches
URL: https://github.com/apache/flink/pull/6781
 
 
   ## What is the purpose of the change
   
   This change introduces a method that allows checking if a Pattern can 
produce empty matches. It is needed to disable such patterns in 
MATCH_RECOGNIZE, until we are able to provide a full support of empty matches 
in CEP (with proper support of reluctant and greedy)
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / **no**)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
 - The serializers: (yes / **no** / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
 - The S3 file system connector: (yes / **no** / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / **no**)
 - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add method to check if pattern can produce empty matches
> 
>
> Key: FLINK-10470
> URL: https://issues.apache.org/jira/browse/FLINK-10470
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> There is couple of inconsistencies how CEP library handles greedy and 
> reluctant operators at the beginning at end of pattern. This results in 
> subtle problems how empty matches should be generated for patterns like e.g. 
> A? or A*?, where one is greedy and the other one is reluctant. In order to 
> provide first version of MATCH_RECOGNIZE function we should have a 
> possibility to disable patterns which can produce empty matches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10470:
---
Labels: pull-request-available  (was: )

> Add method to check if pattern can produce empty matches
> 
>
> Key: FLINK-10470
> URL: https://issues.apache.org/jira/browse/FLINK-10470
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> There is couple of inconsistencies how CEP library handles greedy and 
> reluctant operators at the beginning at end of pattern. This results in 
> subtle problems how empty matches should be generated for patterns like e.g. 
> A? or A*?, where one is greedy and the other one is reluctant. In order to 
> provide first version of MATCH_RECOGNIZE function we should have a 
> possibility to disable patterns which can produce empty matches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader

2018-10-01 Thread GitBox
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221242052
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 ##
 @@ -238,6 +238,37 @@ public String toString() {
return tupleInfo;
}
 
+   /**
+* Resolves a type information for each specified income type and forms 
an instance of a resulting {@link TupleTypeInfo} type.
+* @param incomeTypes tuple fields' types
+* @param  a resulting type of a tuple, e.g. Tuple1, Tuple2...
+* @return A tuple information type, built from the specified income 
types.
+*/
+   @SuppressWarnings("unchecked")
+   @PublicEvolving
+   public static  TupleTypeInfo 
getTupleTypeInfo(Class... incomeTypes) {
+   if (incomeTypes == null || incomeTypes.length == 0) {
+   throw new IllegalArgumentException();
 
 Review comment:
   I think the good idea add here message for exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader

2018-10-01 Thread GitBox
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221248122
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
 ##
 @@ -262,54 +267,115 @@ public void setCharset(Charset charset) {
// 

//  Mapping from types to parsers
// 

-   
+
/**
-* Gets the parser for the type specified by the given class. Returns 
null, if no parser for that class
+* Provides an instance of {@link FieldParser} that corresponds to the 
specified type.
+* @param type a field type for which a {@link FieldParser} is needed.
+* @return if there is a custom parser for the specified field type - 
it is returned; then, if there is a default parser
+* responsible for the specified type - it is returned; otherwise, 
{@code null} is returned.
+*/
+   @SuppressWarnings("unchecked")
+   public static  FieldParser getParserInstanceFor(Class type) {
+   ParserFactory parserFactory = (ParserFactory) 
CUSTOM_PARSERS.get(type);
+   if (parserFactory == null) {
+   parserFactory = (ParserFactory) 
DEFAULT_PARSERS.get(type);
+   }
+
+   if (parserFactory == null) {
+   return null;
+   }
+
+   return parserFactory.create();
+   }
+
+   /**
+* Gets the default parser for the type specified by the given class. 
Returns null, if no parser for that class
 * is known.
-* 
+*
 * @param type The class of the type to get the parser for.
 * @return The parser for the given type, or null, if no such parser 
exists.
 */
-   public static  Class> getParserForType(Class type) 
{
-   Class> parser = PARSERS.get(type);
-   if (parser == null) {
+   public static  Class> 
getDefaultParserForType(Class type) {
+   ParserFactory parserFactory = DEFAULT_PARSERS.get(type);
+   if (parserFactory == null) {
return null;
} else {
@SuppressWarnings("unchecked")
-   Class> typedParser = 
(Class>) parser;
+   Class> typedParser = 
(Class>) parserFactory.getParserType();
return typedParser;
}
}
-   
-   private static final Map, Class>> 
PARSERS = 
-   new HashMap, Class>>();
-   
+
+   /**
+* Gets the custom parser for the type specified by the given class. 
Returns null, if no parser for that class
+* is known.
+*
+* @param type The class of the type to get the parser for.
+* @return The parser for the given type, or null, if no such parser 
exists.
+*/
+   public static  Class> 
getCustomParserForType(Class type) {
+   synchronized (CUSTOM_PARSERS) {
+   ParserFactory parserFactory = (ParserFactory) 
CUSTOM_PARSERS.get(type);
+   if (parserFactory == null) {
+   return null;
+   } else {
+   return parserFactory.getParserType();
+   }
+   }
+   }
+
+   private static final Map, ParserFactory> DEFAULT_PARSERS = 
new HashMap<>();
+
static {
// basic types
-   PARSERS.put(Byte.class, ByteParser.class);
-   PARSERS.put(Short.class, ShortParser.class);
-   PARSERS.put(Integer.class, IntParser.class);
-   PARSERS.put(Long.class, LongParser.class);
-   PARSERS.put(String.class, StringParser.class);
-   PARSERS.put(Float.class, FloatParser.class);
-   PARSERS.put(Double.class, DoubleParser.class);
-   PARSERS.put(Boolean.class, BooleanParser.class);
-   PARSERS.put(BigDecimal.class, BigDecParser.class);
-   PARSERS.put(BigInteger.class, BigIntParser.class);
+   DEFAULT_PARSERS.put(Byte.class, new 
DefaultParserFactory<>(ByteParser.class));
+   DEFAULT_PARSERS.put(Short.class, new 
DefaultParserFactory<>(ShortParser.class));
+   DEFAULT_PARSERS.put(Integer.class, new 
DefaultParserFactory<>(IntParser.class));
+   DEFAULT_PARSERS.put(Long.class, new 
DefaultParserFactory<>(LongParser.class));
+   DEFAULT_PARSERS.put(String.class, new 
DefaultParserFactory<>(StringParser.class));
+   DEFAULT_PARSERS.put(Float.class, new 
DefaultParserFactory<>(FloatParser.class));
+   DEFAULT_PARSERS.put(Double.class, n

[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader

2018-10-01 Thread GitBox
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221250089
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
 ##
 @@ -473,7 +534,7 @@ private static void appendTupleTypeGenerics(StringBuilder 
sb, int numFields) {
}
 
private static final String HEADER =
-   "/*\n"
+   "/*\n"
 
 Review comment:
   please revert this change


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader

2018-10-01 Thread GitBox
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221526823
 
 

 ##
 File path: flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
 ##
 @@ -20,22 +20,53 @@
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.PojoField;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
-
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
 
 Review comment:
   I assume it isn't necessary changes


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined types in CsvReader

2018-10-01 Thread GitBox
ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221243381
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 ##
 @@ -129,7 +129,7 @@ protected TypeExtractor() {
//  TypeInfoFactory registry
// 

 
-   private static Map> 
registeredTypeInfoFactories = new HashMap<>();
+   public static Map> 
registeredTypeInfoFactories = new HashMap<>();
 
 Review comment:
   For what is it chages?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633715#comment-16633715
 ] 

ASF GitHub Bot commented on FLINK-2435:
---

ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221248122
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/types/parser/FieldParser.java
 ##
 @@ -262,54 +267,115 @@ public void setCharset(Charset charset) {
// 

//  Mapping from types to parsers
// 

-   
+
/**
-* Gets the parser for the type specified by the given class. Returns 
null, if no parser for that class
+* Provides an instance of {@link FieldParser} that corresponds to the 
specified type.
+* @param type a field type for which a {@link FieldParser} is needed.
+* @return if there is a custom parser for the specified field type - 
it is returned; then, if there is a default parser
+* responsible for the specified type - it is returned; otherwise, 
{@code null} is returned.
+*/
+   @SuppressWarnings("unchecked")
+   public static  FieldParser getParserInstanceFor(Class type) {
+   ParserFactory parserFactory = (ParserFactory) 
CUSTOM_PARSERS.get(type);
+   if (parserFactory == null) {
+   parserFactory = (ParserFactory) 
DEFAULT_PARSERS.get(type);
+   }
+
+   if (parserFactory == null) {
+   return null;
+   }
+
+   return parserFactory.create();
+   }
+
+   /**
+* Gets the default parser for the type specified by the given class. 
Returns null, if no parser for that class
 * is known.
-* 
+*
 * @param type The class of the type to get the parser for.
 * @return The parser for the given type, or null, if no such parser 
exists.
 */
-   public static  Class> getParserForType(Class type) 
{
-   Class> parser = PARSERS.get(type);
-   if (parser == null) {
+   public static  Class> 
getDefaultParserForType(Class type) {
+   ParserFactory parserFactory = DEFAULT_PARSERS.get(type);
+   if (parserFactory == null) {
return null;
} else {
@SuppressWarnings("unchecked")
-   Class> typedParser = 
(Class>) parser;
+   Class> typedParser = 
(Class>) parserFactory.getParserType();
return typedParser;
}
}
-   
-   private static final Map, Class>> 
PARSERS = 
-   new HashMap, Class>>();
-   
+
+   /**
+* Gets the custom parser for the type specified by the given class. 
Returns null, if no parser for that class
+* is known.
+*
+* @param type The class of the type to get the parser for.
+* @return The parser for the given type, or null, if no such parser 
exists.
+*/
+   public static  Class> 
getCustomParserForType(Class type) {
+   synchronized (CUSTOM_PARSERS) {
+   ParserFactory parserFactory = (ParserFactory) 
CUSTOM_PARSERS.get(type);
+   if (parserFactory == null) {
+   return null;
+   } else {
+   return parserFactory.getParserType();
+   }
+   }
+   }
+
+   private static final Map, ParserFactory> DEFAULT_PARSERS = 
new HashMap<>();
+
static {
// basic types
-   PARSERS.put(Byte.class, ByteParser.class);
-   PARSERS.put(Short.class, ShortParser.class);
-   PARSERS.put(Integer.class, IntParser.class);
-   PARSERS.put(Long.class, LongParser.class);
-   PARSERS.put(String.class, StringParser.class);
-   PARSERS.put(Float.class, FloatParser.class);
-   PARSERS.put(Double.class, DoubleParser.class);
-   PARSERS.put(Boolean.class, BooleanParser.class);
-   PARSERS.put(BigDecimal.class, BigDecParser.class);
-   PARSERS.put(BigInteger.class, BigIntParser.class);
+   DEFAULT_PARSERS.put(Byte.class, new 
DefaultParserFactory<>(ByteParser.class));
+   DEFAULT_PARSERS.put(Short.class, new 
DefaultParserFactory<>(ShortParser.class));
+   DEFAULT_PARSERS.put(Integer.class, new 
DefaultParserFactory<>(IntParser.class));
+   DEFAULT_PARSERS.put(Long.class, new 
DefaultParserFactory<>(LongParser.c

[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633716#comment-16633716
 ] 

ASF GitHub Bot commented on FLINK-2435:
---

ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221243381
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
 ##
 @@ -129,7 +129,7 @@ protected TypeExtractor() {
//  TypeInfoFactory registry
// 

 
-   private static Map> 
registeredTypeInfoFactories = new HashMap<>();
+   public static Map> 
registeredTypeInfoFactories = new HashMap<>();
 
 Review comment:
   For what is it chages?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for custom CSV field parsers
> 
>
> Key: FLINK-2435
> URL: https://issues.apache.org/jira/browse/FLINK-2435
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Dmitrii Kober
>Priority: Minor
>  Labels: pull-request-available
>
> The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types 
> (byte, short, int, long, float, double, boolean, String).
> It would be good to add support for CSV field parsers for custom data types 
> which can be registered in a {{CSVReader}}. 
> We could offer two interfaces for field parsers.
> 1. The regular low-level {{FieldParser}} which operates on a byte array and 
> offsets.
> 2. A {{StringFieldParser}} which operates on a String that has been extracted 
> by a {{StringParser}} before. This interface will be easier to implement but 
> less efficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633713#comment-16633713
 ] 

ASF GitHub Bot commented on FLINK-2435:
---

ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221250089
 
 

 ##
 File path: 
flink-java/src/test/java/org/apache/flink/api/java/tuple/TupleGenerator.java
 ##
 @@ -473,7 +534,7 @@ private static void appendTupleTypeGenerics(StringBuilder 
sb, int numFields) {
}
 
private static final String HEADER =
-   "/*\n"
+   "/*\n"
 
 Review comment:
   please revert this change


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for custom CSV field parsers
> 
>
> Key: FLINK-2435
> URL: https://issues.apache.org/jira/browse/FLINK-2435
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Dmitrii Kober
>Priority: Minor
>  Labels: pull-request-available
>
> The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types 
> (byte, short, int, long, float, double, boolean, String).
> It would be good to add support for CSV field parsers for custom data types 
> which can be registered in a {{CSVReader}}. 
> We could offer two interfaces for field parsers.
> 1. The regular low-level {{FieldParser}} which operates on a byte array and 
> offsets.
> 2. A {{StringFieldParser}} which operates on a String that has been extracted 
> by a {{StringParser}} before. This interface will be easier to implement but 
> less efficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633717#comment-16633717
 ] 

ASF GitHub Bot commented on FLINK-2435:
---

ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221526823
 
 

 ##
 File path: flink-java/src/main/java/org/apache/flink/api/java/io/CsvReader.java
 ##
 @@ -20,22 +20,53 @@
 
 import org.apache.flink.annotation.Public;
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.api.java.operators.DataSource;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple10;
+import org.apache.flink.api.java.tuple.Tuple11;
+import org.apache.flink.api.java.tuple.Tuple12;
+import org.apache.flink.api.java.tuple.Tuple13;
+import org.apache.flink.api.java.tuple.Tuple14;
+import org.apache.flink.api.java.tuple.Tuple15;
+import org.apache.flink.api.java.tuple.Tuple16;
+import org.apache.flink.api.java.tuple.Tuple17;
+import org.apache.flink.api.java.tuple.Tuple18;
+import org.apache.flink.api.java.tuple.Tuple19;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple20;
+import org.apache.flink.api.java.tuple.Tuple21;
+import org.apache.flink.api.java.tuple.Tuple22;
+import org.apache.flink.api.java.tuple.Tuple23;
+import org.apache.flink.api.java.tuple.Tuple24;
+import org.apache.flink.api.java.tuple.Tuple25;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.tuple.Tuple5;
+import org.apache.flink.api.java.tuple.Tuple6;
+import org.apache.flink.api.java.tuple.Tuple7;
+import org.apache.flink.api.java.tuple.Tuple8;
+import org.apache.flink.api.java.tuple.Tuple9;
+import org.apache.flink.api.java.typeutils.PojoField;
 import org.apache.flink.api.java.typeutils.PojoTypeInfo;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.types.Row;
 import org.apache.flink.util.Preconditions;
 
-//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
-import org.apache.flink.api.java.tuple.*;
-//CHECKSTYLE.ON: AvoidStarImport|ImportOrder
-
+import java.lang.reflect.Field;
 import java.util.ArrayList;
 import java.util.Arrays;
 
+//CHECKSTYLE.OFF: AvoidStarImport|ImportOrder
 
 Review comment:
   I assume it isn't necessary changes


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for custom CSV field parsers
> 
>
> Key: FLINK-2435
> URL: https://issues.apache.org/jira/browse/FLINK-2435
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Dmitrii Kober
>Priority: Minor
>  Labels: pull-request-available
>
> The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types 
> (byte, short, int, long, float, double, boolean, String).
> It would be good to add support for CSV field parsers for custom data types 
> which can be registered in a {{CSVReader}}. 
> We could offer two interfaces for field parsers.
> 1. The regular low-level {{FieldParser}} which operates on a byte array and 
> offsets.
> 2. A {{StringFieldParser}} which operates on a String that has been extracted 
> by a {{StringParser}} before. This interface will be easier to implement but 
> less efficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-2435) Add support for custom CSV field parsers

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633714#comment-16633714
 ] 

ASF GitHub Bot commented on FLINK-2435:
---

ex00 commented on a change in pull request #5862: [FLINK-2435] User-defined 
types in CsvReader
URL: https://github.com/apache/flink/pull/5862#discussion_r221242052
 
 

 ##
 File path: 
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TupleTypeInfo.java
 ##
 @@ -238,6 +238,37 @@ public String toString() {
return tupleInfo;
}
 
+   /**
+* Resolves a type information for each specified income type and forms 
an instance of a resulting {@link TupleTypeInfo} type.
+* @param incomeTypes tuple fields' types
+* @param  a resulting type of a tuple, e.g. Tuple1, Tuple2...
+* @return A tuple information type, built from the specified income 
types.
+*/
+   @SuppressWarnings("unchecked")
+   @PublicEvolving
+   public static  TupleTypeInfo 
getTupleTypeInfo(Class... incomeTypes) {
+   if (incomeTypes == null || incomeTypes.length == 0) {
+   throw new IllegalArgumentException();
 
 Review comment:
   I think the good idea add here message for exception.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add support for custom CSV field parsers
> 
>
> Key: FLINK-2435
> URL: https://issues.apache.org/jira/browse/FLINK-2435
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Dmitrii Kober
>Priority: Minor
>  Labels: pull-request-available
>
> The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types 
> (byte, short, int, long, float, double, boolean, String).
> It would be good to add support for CSV field parsers for custom data types 
> which can be registered in a {{CSVReader}}. 
> We could offer two interfaces for field parsers.
> 1. The regular low-level {{FieldParser}} which operates on a byte array and 
> offsets.
> 2. A {{StringFieldParser}} which operates on a String that has been extracted 
> by a {{StringParser}} before. This interface will be easier to implement but 
> less efficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-2435) Add support for custom CSV field parsers

2018-10-01 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-2435?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-2435:
--
Labels: pull-request-available  (was: )

> Add support for custom CSV field parsers
> 
>
> Key: FLINK-2435
> URL: https://issues.apache.org/jira/browse/FLINK-2435
> Project: Flink
>  Issue Type: New Feature
>  Components: DataSet API
>Affects Versions: 0.10.0
>Reporter: Fabian Hueske
>Assignee: Dmitrii Kober
>Priority: Minor
>  Labels: pull-request-available
>
> The {{CSVInputFormats}} have only {{FieldParsers}} for Java's primitive types 
> (byte, short, int, long, float, double, boolean, String).
> It would be good to add support for CSV field parsers for custom data types 
> which can be registered in a {{CSVReader}}. 
> We could offer two interfaces for field parsers.
> 1. The regular low-level {{FieldParser}} which operates on a byte array and 
> offsets.
> 2. A {{StringFieldParser}} which operates on a String that has been extracted 
> by a {{StringParser}} before. This interface will be easier to implement but 
> less efficient.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.

2018-10-01 Thread Fabian Hueske (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633722#comment-16633722
 ] 

Fabian Hueske commented on FLINK-10447:
---

Hi [~Suxing Lee], you should be aware that this is a major effort. Just porting 
your code to StreamingFileSink won't be sufficient.

As I said before, we need to thoroughly discuss the design of such a major 
feature before starting to implement it.
 We need a detailed design document that addresses all the points (and probably 
more) that I've listed before.

I would suggest to close this issue and the pull request and create a new issue 
as a subissue of FLINK-8535.

Best, Fabian

> Create Bucketing Table Sink.
> 
>
> Key: FLINK-10447
> URL: https://issues.apache.org/jira/browse/FLINK-10447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Suxing Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> It would be nice to integrate the table APIs with the HDFS connectors so that 
> the rows in the tables can be directly pushed into HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

2018-10-01 Thread GitBox
twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table 
sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#issuecomment-425835514
 
 
   Thank you @dawidwys. Travis gave green light. I will merge this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633731#comment-16633731
 ] 

ASF GitHub Bot commented on FLINK-3875:
---

twalthr commented on issue #6611: [FLINK-3875] [connectors] Add an upsert table 
sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611#issuecomment-425835514
 
 
   Thank you @dawidwys. Travis gave green light. I will merge this...


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6700: [FLINK-10340][table] Add Cosh math function supported in Table API and SQL

2018-10-01 Thread GitBox
pnowojski commented on a change in pull request #6700: [FLINK-10340][table] Add 
Cosh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#discussion_r221537661
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2)
   
 
 
+
+  
+{% highlight text %}
+COSH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cosine of numeric. Return value type 
is DOUBLE.
 
 Review comment:
   @xccui if you think using `` is better I'm fine with that, I don't mind 
one way or the other.
   
   However I don't understand:
   
   > before reaching a consensus about the format and the internal 
implementation, I suggest to keep the current doc style, i.e., without 
mentioning the return type, for now.
   
   We are already following a convention that for all of the math functions we 
are dropping precision and returning `double`. What's the point of not 
documenting it?
   
   If you could guarantee me that we will document those return types lets say 
within a month, I would be fine with postponing it. However I don't believe 
something like that would happen, so I prefer to incrementally start adding 
this return type information right now. Later if someone will want, we can 
reformat the page. I do not see any downside to such incremental approach until 
that moment.
   
   > why still indicating the return type for each numeric function explicitly? 
How about adding a single note for that?
   
   since some of them can be implemented precisely (`abs`, `min`, `max`, 
`ceil`, ...), others can be for some variants (`power(DECIMAL, INT)`). We can 
state this general rule that functions returning `DOUBLE` are approximations, 
but we still need specific information for every function, whether it returns 
approximated (`DOUBLE`) or precise (`DECIMAL`) result. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10340) Add Cosh math function supported in Table API and SQL

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10340?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633740#comment-16633740
 ] 

ASF GitHub Bot commented on FLINK-10340:


pnowojski commented on a change in pull request #6700: [FLINK-10340][table] Add 
Cosh math function supported in Table API and SQL
URL: https://github.com/apache/flink/pull/6700#discussion_r221537661
 
 

 ##
 File path: docs/dev/table/functions.md
 ##
 @@ -1274,6 +1274,17 @@ ATAN2(numeric1, numeric2)
   
 
 
+
+  
+{% highlight text %}
+COSH(numeric)
+{% endhighlight %}
+  
+  
+Returns the hyperbolic cosine of numeric. Return value type 
is DOUBLE.
 
 Review comment:
   @xccui if you think using `` is better I'm fine with that, I don't mind 
one way or the other.
   
   However I don't understand:
   
   > before reaching a consensus about the format and the internal 
implementation, I suggest to keep the current doc style, i.e., without 
mentioning the return type, for now.
   
   We are already following a convention that for all of the math functions we 
are dropping precision and returning `double`. What's the point of not 
documenting it?
   
   If you could guarantee me that we will document those return types lets say 
within a month, I would be fine with postponing it. However I don't believe 
something like that would happen, so I prefer to incrementally start adding 
this return type information right now. Later if someone will want, we can 
reformat the page. I do not see any downside to such incremental approach until 
that moment.
   
   > why still indicating the return type for each numeric function explicitly? 
How about adding a single note for that?
   
   since some of them can be implemented precisely (`abs`, `min`, `max`, 
`ceil`, ...), others can be for some variants (`power(DECIMAL, INT)`). We can 
state this general rule that functions returning `DOUBLE` are approximations, 
but we still need specific information for every function, whether it returns 
approximated (`DOUBLE`) or precise (`DECIMAL`) result. 
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add Cosh math function supported in Table API and SQL
> -
>
> Key: FLINK-10340
> URL: https://issues.apache.org/jira/browse/FLINK-10340
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.6.0
>Reporter: Sergey Tsvetkov
>Assignee: vinoyang
>Priority: Minor
>  Labels: pull-request-available
>
> Implement udf of cosh, just like in oracle
> [https://docs.oracle.com/cd/B28359_01/server.111/b28286/functions031.htm#SQLRF00623]
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9715) Support versioned joins with event time

2018-10-01 Thread Piotr Nowojski (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633747#comment-16633747
 ] 

Piotr Nowojski commented on FLINK-9715:
---

[~hequn8128] probably something like that would be possible to implement, but 
it isn't right now nor is it planned.  Also it would need separate discussion 
to define the semantic what should it mean. For append only tables it would 
make sense (return only the rows between {{rowtime_s}} and {{rowtime_e}} ?), 
but what about retractions and/or updates? If row is removed/updated in this 
period, what should be returned?

> Support versioned joins with event time
> ---
>
> Key: FLINK-9715
> URL: https://issues.apache.org/jira/browse/FLINK-9715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Major
>  Labels: pull-request-available
>
> Queries like:
> {code:java}
> SELECT 
>   o.amount * r.rate 
> FROM 
>   Orders AS o, 
>   LATERAL TABLE (Rates(o.rowtime)) AS r 
> WHERE o.currency = r.currency{code}
> should work with event time



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10466) flink-yarn-tests should depend flink-dist

2018-10-01 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10466?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633748#comment-16633748
 ] 

Chesnay Schepler commented on FLINK-10466:
--

if flink-yarn-tests does indeed rely on flink-dist being built beforehand then 
we should definitely add it as a dependency.

> flink-yarn-tests should depend flink-dist
> -
>
> Key: FLINK-10466
> URL: https://issues.apache.org/jira/browse/FLINK-10466
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.7.0
>Reporter: tison
>Priority: Major
> Fix For: 1.7.0
>
>
> may be adding
> {code:java}
> 
>  org.apache.flink
>  flink-dist_${scala.binary.version}
>  ${project.version}
>  test
>  pom
> {code}
> not really sure but it causes failure on my automate testing process, and by 
> adding this dependency the error disappear. Even I wonder how it works 
> currently on travis.
> flink-yarn-test obviously depends on flink-dist since some tests try to find 
> flink uberjar.
> Please take a look for this. cc [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6741: [FLINK-9712][table, docs] Document processing time Temporal Table Joins

2018-10-01 Thread GitBox
pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r221540451
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(Tuple2.of("US Dollar", 102L));
+ratesHistoryData.add(Tuple2.of("Euro", 114L));
+ratesHistoryData.add(Tuple2.of("Yen", 1L));
+ratesHistoryData.add(Tuple2.of("Euro", 116L));
+ratesHistoryData.add(Tuple2.of("Euro", 119L));
+
+DataStreamSource> ordersStream = 
e

[GitHub] dawidwys commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink

2018-10-01 Thread GitBox
dawidwys commented on issue #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769#issuecomment-425840104
 
 
   Hi @SuXingLee, thank you very much for your contribution. I think having a 
table sink for file system would be a great feature. However I think we should 
use the new `StreamingFileSink`, on which the community is working for some 
time already, for this purpose. This new connector is fixing a few problems 
with the old one, that's why it will probably replace the `BucketingSink` at 
some point. 
   
   To sum up I think we should use the `StreamingFileSink` rather than 
`BucketingSink` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Assigned] (FLINK-10469) FileChannel may not write the whole buffer in a single call to FileChannel.write(Buffer buffer)

2018-10-01 Thread Yun Gao (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10469?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yun Gao reassigned FLINK-10469:
---

Assignee: Yun Gao

> FileChannel may not write the whole buffer in a single call to 
> FileChannel.write(Buffer buffer)
> ---
>
> Key: FLINK-10469
> URL: https://issues.apache.org/jira/browse/FLINK-10469
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.4.1, 1.4.2, 1.5.3, 1.6.0, 1.6.1, 1.7.0, 1.5.4, 1.6.2
>Reporter: Yun Gao
>Assignee: Yun Gao
>Priority: Major
>
> Currently all the calls to _FileChannel.write(ByteBuffer src)_ assumes that 
> this method will not return before the whole buffer is written, like the one 
> in _AsynchronousFileIOChannel.write()._
>  
> However, this assumption may not be right for all the environments. We have 
> encountered the case that only part of a buffer was written on a cluster with 
> a high IO load, and the target file got messy. 
>  
> To fix this issue, I think we should add a utility method in the 
> org.apache.flink.util.IOUtils to ensure the whole buffer is written with a 
> loop,and replace all the calls to _FileChannel.write(ByteBuffer)_ with this 
> new method. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9712) Support enrichment joins in Flink SQL/Table API

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633752#comment-16633752
 ] 

ASF GitHub Bot commented on FLINK-9712:
---

pnowojski commented on a change in pull request #6741: [FLINK-9712][table,docs] 
Document processing time Temporal Table Joins
URL: https://github.com/apache/flink/pull/6741#discussion_r221540451
 
 

 ##
 File path: docs/dev/table/streaming/temporal_tables.md
 ##
 @@ -0,0 +1,263 @@
+---
+title: "Temporal Tables"
+nav-parent_id: streaming_tableapi
+nav-pos: 4
+---
+
+
+Temporal Tables represent a concept of a table that changes over time
+and for which Flink keeps track of those changes.
+
+* This will be replaced by the TOC
+{:toc}
+
+Motivation
+--
+
+Lets assume that we have two following tables.
+
+{% highlight sql %}
+SELECT * FROM Orders;
+
+rowtime amount currency
+=== == =
+10:152 Euro
+10:301 US Dollar
+10:32   50 Yen
+10:523 Euro
+11:045 US Dollar
+{% endhighlight %}
+
+`Orders` represents payments for given `amount` and given `currency`.
+For example at `10:15` there was an order for an amount of `2 Euro`.
+
+{% highlight sql %}
+SELECT * FROM RatesHistory;
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+10:45   Euro116
+11:15   Euro119
+{% endhighlight %}
+
+`RatesHistory` represents an ever changing append-only stream of currency 
exchange rates, with respect to `Yen` (which has a rate of `1`).
+For example exchange rate for a period from `09:00` to `10:45` of `Euro` to 
`Yen` was `114`.
+From `10:45` to `11:15` it was `116`.
+
+Task is now to calculate a value of all of the `Orders` converted to common 
currency (`Yen`).
+For example we would like to convert the order
+{% highlight sql %}
+rowtime amount currency
+=== == =
+10:152 Euro
+{% endhighlight %}
+using the appropriate conversion rate for the given `rowtime` (`114`).
+Without using Temporal Tables in order to do so, one would need to write such 
query:
+{% highlight sql %}
+SELECT
+  SUM(o.amount * r.rate) AS amount
+FROM Orders AS o,
+  RatesHistory AS r
+WHERE r.currency = o.currency
+AND r.rowtime = (
+  SELECT MAX(rowtime)
+  FROM Rates AS r2
+  WHERE r2.currency = o.currency
+  AND r2.rowtime <= o.rowtime);
+{% endhighlight %}
+Temporal Tables are a concept that aims to simplify this query.
+
+In order to define a Temporal Table, we must define it's primary key,
+Primary key allows us to overwrite older values in the Temporal Table.
+In the above example `currency` would be a primary key for `RatesHistory` 
table.
+Secondly a [time attribute](time_attributes.html) is also required,
+that determines which row is newer and which one is older.
+
+Temporal Table Functions
+
+
+In order to access the data in the Temporal Table, one must define a time 
attribute for which matching version of the table will be returned.
+Flink uses the SQL syntax of Table Functions to provide a way to express it.
+Once defined, Temporal Table Function takes a single argument `timeAttribute` 
and returns a set of rows.
+This set contains the latest versions of the rows for all of existing primary 
keys with respect to the given `timeAttribute`.
+
+Assuming that we defined a `Rates(timeAttribute)` Temporal Table Function 
based on `RatesHistory` table.
+We could query such function in the following way:
+
+{% highlight sql %}
+SELECT * FROM Rates('10:15');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+09:00   Euro114
+09:00   Yen   1
+
+SELECT * FROM Rates('11:00');
+
+rowtime currency   rate
+===  ==
+09:00   US Dollar   102
+10:45   Euro116
+09:00   Yen   1
+{% endhighlight %}
+
+Each query to `Rates(timeAttribute)` would return the state of the `Rates` for 
the given `timeAttribute`*[]:
+
+**Note**: Currently Flink doesn't support directly querying the Temporal Table 
Functions with a constant `timeAttribute`.
+At the moment Temporal Table Functions can only be used in joins.
+Above example was used to provide an intuition about what function 
`Rates(timeAttribute)` returns.
+
+Processing time
+---
+
+### Defining Temporal Table Function
+
+In order to define processing time Temporal Table:
+
+
+
+{% highlight java %}
+StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+StreamTableEnvironment tEnv = TableEnvironment.getTableEnvironment(env);
+
+List> ordersData = new ArrayList<>();
+ordersData.add(Tuple2.of(2L, "Euro"));
+ordersData.add(Tuple2.of(1L, "US Dollar"));
+ordersData.add(Tuple2.of(50L, "Yen"));
+ordersData.add(Tuple2.of(3L, "Euro"));
+ordersData.add(Tuple2.of(5L, "US Dollar"));
+
+List> ratesHistoryData = new ArrayList<>();
+ratesHistoryData.add(T

[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633751#comment-16633751
 ] 

ASF GitHub Bot commented on FLINK-10447:


dawidwys commented on issue #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769#issuecomment-425840104
 
 
   Hi @SuXingLee, thank you very much for your contribution. I think having a 
table sink for file system would be a great feature. However I think we should 
use the new `StreamingFileSink`, on which the community is working for some 
time already, for this purpose. This new connector is fixing a few problems 
with the old one, that's why it will probably replace the `BucketingSink` at 
some point. 
   
   To sum up I think we should use the `StreamingFileSink` rather than 
`BucketingSink` here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Create Bucketing Table Sink.
> 
>
> Key: FLINK-10447
> URL: https://issues.apache.org/jira/browse/FLINK-10447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Suxing Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> It would be nice to integrate the table APIs with the HDFS connectors so that 
> the rows in the tables can be directly pushed into HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table sink factory for Elasticsearch

2018-10-01 Thread GitBox
asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table 
sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index f79dd93486e..0b7d8948f79 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -43,6 +43,7 @@ The following table list all available connectors and 
formats. Their mutual comp
 | Name  | Version   | Maven dependency | SQL 
Client JAR |
 | : | : | :--- | 
:--|
 | Filesystem|   | Built-in | Built-in  
 |
+| Elasticsearch | 6 | `flink-connector-elasticsearch6` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
 | Apache Kafka  | 0.8   | `flink-connector-kafka-0.8`  | Not 
available  |
 | Apache Kafka  | 0.9   | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
 | Apache Kafka  | 0.10  | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
@@ -588,6 +589,111 @@ Make sure to add the version-specific Kafka dependency. 
In addition, a correspon
 
 {% top %}
 
+### Elasticsearch Connector
+
+Sink: Streaming Append Mode
+Sink: Streaming Upsert Mode
+Format: JSON-only
+
+The Elasticsearch connector allows for writing into an index of the 
Elasticsearch search engine.
+
+The connector can operate in [upsert mode](#update-modes) for exchanging 
UPSERT/DELETE messages with the external system using a [key defined by the 
query](streaming.html#table-to-stream-conversion).
+
+For append-only queries, the connector can also operate in [append 
mode](#update-modes) for exchanging only INSERT messages with the external 
system. If no key is defined by the query, a key is automatically generated by 
Elasticsearch.
+
+The connector can be defined as follows:
+
+
+
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+.version("6")  // required: valid connector versions 
are "6"
+.host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
+.index("MyUsers")  // required: Elasticsearch index
+.documentType("user")  // required: Elasticsearch document type
+
+.keyDelimiter("$")// optional: delimiter for composite keys ("_" 
by default)
+  //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+.keyNullLiteral("n/a")// optional: representation for null fields in 
keys ("null" by default)
+
+// optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
+.failureHandlerFail()  // optional: throws an exception if a 
request fails and causes a job failure
+.failureHandlerIgnore()//   or ignores failures and drops the 
request
+.failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
+.failureHandlerCustom(...) //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+
+// optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
+.disableFlushOnCheckpoint()// optional: disables flushing on 
checkpoint (see notes below!)
+.bulkFlushMaxActions(42)   // optional: maximum number of actions to 
buffer for each bulk request
+.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered 
actions in bytes per bulk request
+   //   (only MB granularity is supported)
+.bulkFlushInterval(6L) // optional: bulk flush interval (in 
milliseconds)
+
+.bulkFlushBackoffConstant()// optional: use a constant backoff type
+.bulkFlushBackoffExponential() //   or use an exponential backoff type
+.bulkFlushBackoffMaxRetries(3) // optional: maximum number of retries
+.bulkFlushBackoffDelay(3L) // optional: delay between each backoff 
attempt (in milliseconds)
+
+// optional: connection properties to be used during RE

[jira] [Resolved] (FLINK-3875) Add a TableSink for Elasticsearch

2018-10-01 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-3875.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed in 1.7.0: 10f9f1d431dbf047ea14130d9fb7fdd4f78178fc

> Add a TableSink for Elasticsearch
> -
>
> Key: FLINK-3875
> URL: https://issues.apache.org/jira/browse/FLINK-3875
> Project: Flink
>  Issue Type: New Feature
>  Components: Streaming Connectors, Table API & SQL
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add a TableSink that writes data to Elasticsearch



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-3875) Add a TableSink for Elasticsearch

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-3875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633765#comment-16633765
 ] 

ASF GitHub Bot commented on FLINK-3875:
---

asfgit closed pull request #6611: [FLINK-3875] [connectors] Add an upsert table 
sink factory for Elasticsearch
URL: https://github.com/apache/flink/pull/6611
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
index f79dd93486e..0b7d8948f79 100644
--- a/docs/dev/table/connect.md
+++ b/docs/dev/table/connect.md
@@ -43,6 +43,7 @@ The following table list all available connectors and 
formats. Their mutual comp
 | Name  | Version   | Maven dependency | SQL 
Client JAR |
 | : | : | :--- | 
:--|
 | Filesystem|   | Built-in | Built-in  
 |
+| Elasticsearch | 6 | `flink-connector-elasticsearch6` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-elasticsearch6{{site.scala_version_suffix}}/{{site.version}}/flink-connector-elasticsearch6{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
 | Apache Kafka  | 0.8   | `flink-connector-kafka-0.8`  | Not 
available  |
 | Apache Kafka  | 0.9   | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
 | Apache Kafka  | 0.10  | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
@@ -588,6 +589,111 @@ Make sure to add the version-specific Kafka dependency. 
In addition, a correspon
 
 {% top %}
 
+### Elasticsearch Connector
+
+Sink: Streaming Append Mode
+Sink: Streaming Upsert Mode
+Format: JSON-only
+
+The Elasticsearch connector allows for writing into an index of the 
Elasticsearch search engine.
+
+The connector can operate in [upsert mode](#update-modes) for exchanging 
UPSERT/DELETE messages with the external system using a [key defined by the 
query](streaming.html#table-to-stream-conversion).
+
+For append-only queries, the connector can also operate in [append 
mode](#update-modes) for exchanging only INSERT messages with the external 
system. If no key is defined by the query, a key is automatically generated by 
Elasticsearch.
+
+The connector can be defined as follows:
+
+
+
+{% highlight java %}
+.connect(
+  new Elasticsearch()
+.version("6")  // required: valid connector versions 
are "6"
+.host("localhost", 9200, "http")   // required: one or more Elasticsearch 
hosts to connect to
+.index("MyUsers")  // required: Elasticsearch index
+.documentType("user")  // required: Elasticsearch document type
+
+.keyDelimiter("$")// optional: delimiter for composite keys ("_" 
by default)
+  //   e.g., "$" would result in IDs 
"KEY1$KEY2$KEY3"
+.keyNullLiteral("n/a")// optional: representation for null fields in 
keys ("null" by default)
+
+// optional: failure handling strategy in case a request to Elasticsearch 
fails (fail by default)
+.failureHandlerFail()  // optional: throws an exception if a 
request fails and causes a job failure
+.failureHandlerIgnore()//   or ignores failures and drops the 
request
+.failureHandlerRetryRejected() //   or re-adds requests that have failed 
due to queue capacity saturation
+.failureHandlerCustom(...) //   or custom failure handling with a 
ActionRequestFailureHandler subclass
+
+// optional: configure how to buffer elements before sending them in bulk 
to the cluster for efficiency
+.disableFlushOnCheckpoint()// optional: disables flushing on 
checkpoint (see notes below!)
+.bulkFlushMaxActions(42)   // optional: maximum number of actions to 
buffer for each bulk request
+.bulkFlushMaxSize("42 mb") // optional: maximum size of buffered 
actions in bytes per bulk request
+   //   (only MB granularity is supported)
+.bulkFlushInterval(6L) // optional: bulk flush interval (in 
milliseconds)
+
+.bulkFlushBackoffConstant()// optional: use a constant backoff type
+.bulkFlushBackoffExponential() //   or use an exponent

[GitHub] zentol commented on a change in pull request #6731: [FLINK-10312] Propagate exception from server to client in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6731: [FLINK-10312] Propagate 
exception from server to client in REST API
URL: https://github.com/apache/flink/pull/6731#discussion_r221546026
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ##
 @@ -207,12 +210,10 @@

resultFuture.whenComplete(

(innerT, innerThrowable) -> scheduledFuture.cancel(false));
} else {
-   final String 
errorMsg = retries == 0 ?
-   "Number 
of retries has been exhausted." :
-   
"Exception is not retryable.";
-   
resultFuture.completeExceptionally(new RetryException(
-   "Could 
not complete the operation. " + errorMsg,
-   
throwable));
+   RetryException 
retryException = new RetryException(
+   "Could 
not complete the operation: number of retries has been exhausted.",
 
 Review comment:
   I would stick with the original formatting: `Could not complete the 
operation. Number of retries has been exhausted.`
   
   When seeing a `:` I would expect a reason as to _why_ it couldn't be 
completed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10312) Wrong / missing exception when submitting job

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10312?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633771#comment-16633771
 ] 

ASF GitHub Bot commented on FLINK-10312:


zentol commented on a change in pull request #6731: [FLINK-10312] Propagate 
exception from server to client in REST API
URL: https://github.com/apache/flink/pull/6731#discussion_r221546026
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/concurrent/FutureUtils.java
 ##
 @@ -207,12 +210,10 @@

resultFuture.whenComplete(

(innerT, innerThrowable) -> scheduledFuture.cancel(false));
} else {
-   final String 
errorMsg = retries == 0 ?
-   "Number 
of retries has been exhausted." :
-   
"Exception is not retryable.";
-   
resultFuture.completeExceptionally(new RetryException(
-   "Could 
not complete the operation. " + errorMsg,
-   
throwable));
+   RetryException 
retryException = new RetryException(
+   "Could 
not complete the operation: number of retries has been exhausted.",
 
 Review comment:
   I would stick with the original formatting: `Could not complete the 
operation. Number of retries has been exhausted.`
   
   When seeing a `:` I would expect a reason as to _why_ it couldn't be 
completed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Wrong / missing exception when submitting job
> -
>
> Key: FLINK-10312
> URL: https://issues.apache.org/jira/browse/FLINK-10312
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.5.2, 1.6.0
>Reporter: Stephan Ewen
>Assignee: Andrey Zagrebin
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2, 1.5.5
>
> Attachments: lmerge-TR.pdf
>
>
> h3. Problem
> When submitting a job that cannot be created / initialized on the JobManager, 
> there is no proper error message. The exception says *"Could not retrieve the 
> execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)"*
> h3. Steps to Reproduce
> Create a streaming job, set a state backend with a non existing file system 
> scheme.
> h3. Full Stack Trace
> {code}
> Submitting a job where instantiation on the JM fails yields this, which seems 
> like a major regression from seeing the actual exception:
> org.apache.flink.client.program.ProgramInvocationException: Could not 
> retrieve the execution result. (JobID: 5a7165e1260c6316fa11d2760bd3d49f)
>   at 
> org.apache.flink.client.program.rest.RestClusterClient.submitJob(RestClusterClient.java:260)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:486)
>   at 
> org.apache.flink.streaming.api.environment.StreamContextEnvironment.execute(StreamContextEnvironment.java:66)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1511)
>   at 
> com.dataartisans.streamledger.examples.simpletrade.SimpleTradeExample.main(SimpleTradeExample.java:98)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:497)
>   at 
> org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:529)
>   at 
> org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:421)
>   at 
> org.apache.flink.client.program.ClusterClient.run(ClusterClient.java:426)
>   at 
> org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:804)
>   at 
> org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:280)
>   at org.apache.fl

[GitHub] dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next strategy

2018-10-01 Thread GitBox
dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next 
strategy
URL: https://github.com/apache/flink/pull/6756
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index d7f915fc62e..c8e836349ef 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1288,6 +1288,15 @@ For example, for a given pattern `b+ c` and a data 
stream `b1 b2 b3 c`, the diff
 
 After found matching b1 b2 b3 c, the match process 
will not discard any result.
 
+
+SKIP_TO_NEXT
+
+b1 b2 b3 c
+b2 b3 c
+b3 c
+
+After found matching b1 b2 b3 c, the match process 
will not discard any result, because no other match could start at b1.
+
 
 SKIP_PAST_LAST_EVENT
 
@@ -1296,7 +1305,7 @@ For example, for a given pattern `b+ c` and a data stream 
`b1 b2 b3 c`, the diff
 After found matching b1 b2 b3 c, the match process 
will discard all started partial matches.
 
 
-SKIP_TO_FIRST[b*]
+SKIP_TO_FIRST[b]
 
 b1 b2 b3 c
 b2 b3 c
@@ -1340,7 +1349,35 @@ Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a 
b c1 c2 c3 d` Then the r
 a b c1 c2 c3 d
 c1 c2 c3 d
 
-After found matching a b c1 c2 c3 d, the match 
process will try to discard all partial matches started before c1. 
There is one such match b c1 c2 c3 d.
+After found matching a b c1 c2 c3 d, the match 
process will discard all partial matches started before c1. There 
is one such match b c1 c2 c3 d.
+
+
+
+To better understand the difference between NO_SKIP and SKIP_TO_NEXT take a 
look at following example:
+Pattern: `a b+` and sequence: `a b1 b2 b3` Then the results will be:
+
+
+
+
+Skip Strategy
+Result
+ Description
+
+
+NO_SKIP
+
+a b1
+a b1 b2
+a b1 b2 b3
+
+After found matching a b1, the match process will not 
discard any result.
+
+
+SKIP_TO_NEXT[b*]
+
+a b1
+
+After found matching a b1, the match process will 
discard all partial matches started at a. This means neither 
a b1 b2 nor a b1 b2 b3 could be generated.
 
 
 
@@ -1354,6 +1391,10 @@ To specify which skip strategy to use, just create an 
`AfterMatchSkipStrategy` b
 AfterMatchSkipStrategy.noSkip()
 Create a NO_SKIP skip strategy 
 
+
+AfterMatchSkipStrategy.skipToNext()
+Create a SKIP_TO_NEXT skip strategy 
+
 
 AfterMatchSkipStrategy.skipPastLastEvent()
 Create a SKIP_PAST_LAST_EVENT skip strategy 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index f4448a35560..7578e29bbfa 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -38,7 +38,7 @@
private static final long serialVersionUID = -4048930333619068531L;
 
/**
-* Discards every partial match that contains event of the match 
preceding the first of *PatternName*.
+* Discards every partial match that started before the first event of 
emitted match mapped to *PatternName*.
 *
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
@@ -48,7 +48,7 @@ public static SkipToFirstStrategy skipToFirst(String 
patternName) {
}
 
/**
-* Discards every partial match that contains event of the match 
preceding the last of *PatternName*.
+* Discards every partial match that started before the last event of 
emitted match mapped to *PatternName*.
 *
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
@@ -58,7 +58,7 @@ public static SkipToLastStrategy skipToLast(String 
patternName) {
}
 
/**
-* Discards every partial match that contains event of the match.
+* Discards every partial match that started before emitted match ended.
 *
 * @return the created AfterMatchSkipStrategy
 */
@@ -66,6 +66,15 @@ public static SkipPastLastStrategy skipPastLastEvent() {
return SkipPastLastStrategy.INSTANCE;
}
 
+   /**
+* Discards every partial match that started with the same event, 
emitted match was started.
+ 

[jira] [Commented] (FLINK-10414) Add skip to next strategy

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633773#comment-16633773
 ] 

ASF GitHub Bot commented on FLINK-10414:


dawidwys closed pull request #6756: [FLINK-10414][cep] Added skip to next 
strategy
URL: https://github.com/apache/flink/pull/6756
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index d7f915fc62e..c8e836349ef 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -1288,6 +1288,15 @@ For example, for a given pattern `b+ c` and a data 
stream `b1 b2 b3 c`, the diff
 
 After found matching b1 b2 b3 c, the match process 
will not discard any result.
 
+
+SKIP_TO_NEXT
+
+b1 b2 b3 c
+b2 b3 c
+b3 c
+
+After found matching b1 b2 b3 c, the match process 
will not discard any result, because no other match could start at b1.
+
 
 SKIP_PAST_LAST_EVENT
 
@@ -1296,7 +1305,7 @@ For example, for a given pattern `b+ c` and a data stream 
`b1 b2 b3 c`, the diff
 After found matching b1 b2 b3 c, the match process 
will discard all started partial matches.
 
 
-SKIP_TO_FIRST[b*]
+SKIP_TO_FIRST[b]
 
 b1 b2 b3 c
 b2 b3 c
@@ -1340,7 +1349,35 @@ Pattern: `(a | c) (b | c) c+.greedy d` and sequence: `a 
b c1 c2 c3 d` Then the r
 a b c1 c2 c3 d
 c1 c2 c3 d
 
-After found matching a b c1 c2 c3 d, the match 
process will try to discard all partial matches started before c1. 
There is one such match b c1 c2 c3 d.
+After found matching a b c1 c2 c3 d, the match 
process will discard all partial matches started before c1. There 
is one such match b c1 c2 c3 d.
+
+
+
+To better understand the difference between NO_SKIP and SKIP_TO_NEXT take a 
look at following example:
+Pattern: `a b+` and sequence: `a b1 b2 b3` Then the results will be:
+
+
+
+
+Skip Strategy
+Result
+ Description
+
+
+NO_SKIP
+
+a b1
+a b1 b2
+a b1 b2 b3
+
+After found matching a b1, the match process will not 
discard any result.
+
+
+SKIP_TO_NEXT[b*]
+
+a b1
+
+After found matching a b1, the match process will 
discard all partial matches started at a. This means neither 
a b1 b2 nor a b1 b2 b3 could be generated.
 
 
 
@@ -1354,6 +1391,10 @@ To specify which skip strategy to use, just create an 
`AfterMatchSkipStrategy` b
 AfterMatchSkipStrategy.noSkip()
 Create a NO_SKIP skip strategy 
 
+
+AfterMatchSkipStrategy.skipToNext()
+Create a SKIP_TO_NEXT skip strategy 
+
 
 AfterMatchSkipStrategy.skipPastLastEvent()
 Create a SKIP_PAST_LAST_EVENT skip strategy 
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
index f4448a35560..7578e29bbfa 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/aftermatch/AfterMatchSkipStrategy.java
@@ -38,7 +38,7 @@
private static final long serialVersionUID = -4048930333619068531L;
 
/**
-* Discards every partial match that contains event of the match 
preceding the first of *PatternName*.
+* Discards every partial match that started before the first event of 
emitted match mapped to *PatternName*.
 *
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
@@ -48,7 +48,7 @@ public static SkipToFirstStrategy skipToFirst(String 
patternName) {
}
 
/**
-* Discards every partial match that contains event of the match 
preceding the last of *PatternName*.
+* Discards every partial match that started before the last event of 
emitted match mapped to *PatternName*.
 *
 * @param patternName the pattern name to skip to
 * @return the created AfterMatchSkipStrategy
@@ -58,7 +58,7 @@ public static SkipToLastStrategy skipToLast(String 
patternName) {
}
 
/**
-* Discards every partial match that contains event of the match.
+* Discards every partial match that started before emitted match ended.
 *
 * @return the created AfterMatchSkipStrategy
 *

[jira] [Resolved] (FLINK-10414) Add skip to next strategy

2018-10-01 Thread Dawid Wysakowicz (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10414?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dawid Wysakowicz resolved FLINK-10414.
--
Resolution: Done

> Add skip to next strategy
> -
>
> Key: FLINK-10414
> URL: https://issues.apache.org/jira/browse/FLINK-10414
> Project: Flink
>  Issue Type: Improvement
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Add skip to next strategy, that should discard all partial matches that 
> started with the same element as found match.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

2018-10-01 Thread GitBox
asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with 
fixed/configurable JobID in StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/6733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 94fc109c47b..59ab4065804 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -39,18 +40,21 @@
 public class PackagedProgramUtils {
 
/**
-* Creates a {@link JobGraph} from the given {@link PackagedProgram}.
+* Creates a {@link JobGraph} with a specified {@link JobID}
+* from the given {@link PackagedProgram}.
 *
 * @param packagedProgram to extract the JobGraph from
 * @param configuration to use for the optimizer and job graph generator
 * @param defaultParallelism for the JobGraph
+* @param jobID the pre-generated job id
 * @return JobGraph extracted from the PackagedProgram
 * @throws ProgramInvocationException if the JobGraph generation failed
 */
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
-   int defaultParallelism) throws 
ProgramInvocationException {
+   int defaultParallelism,
+   JobID jobID) throws ProgramInvocationException {

Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
final Optimizer optimizer = new Optimizer(new DataStatistics(), 
new DefaultCostEstimator(), configuration);
final FlinkPlan flinkPlan;
@@ -79,11 +83,11 @@ public static JobGraph createJobGraph(
final JobGraph jobGraph;
 
if (flinkPlan instanceof StreamingPlan) {
-   jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph(jobID);

jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
} else {
final JobGraphGenerator jobGraphGenerator = new 
JobGraphGenerator(configuration);
-   jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
+   jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
}
 
for (URL url : packagedProgram.getAllLibraries()) {
@@ -99,5 +103,22 @@ public static JobGraph createJobGraph(
return jobGraph;
}
 
+   /**
+* Creates a {@link JobGraph} with a random {@link JobID}
+* from the given {@link PackagedProgram}.
+*
+* @param packagedProgram to extract the JobGraph from
+* @param configuration to use for the optimizer and job graph generator
+* @param defaultParallelism for the JobGraph
+* @return JobGraph extracted from the PackagedProgram
+* @throws ProgramInvocationException if the JobGraph generation failed
+*/
+   public static JobGraph createJobGraph(
+   PackagedProgram packagedProgram,
+   Configuration configuration,
+   int defaultParallelism) throws ProgramInvocationException {
+   return createJobGraph(packagedProgram, configuration, 
defaultParallelism, null);
+   }
+
private PackagedProgramUtils() {}
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 3e0645d6859..d769b6848e7 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.container.entrypoint;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.client.program.PackagedProgramUtils;
 import org.apache.flink.client.program.ProgramInvo

[jira] [Commented] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633777#comment-16633777
 ] 

ASF GitHub Bot commented on FLINK-10291:


asfgit closed pull request #6733: [FLINK-10291] Generate JobGraph with 
fixed/configurable JobID in StandaloneJobClusterEntrypoint
URL: https://github.com/apache/flink/pull/6733
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
index 94fc109c47b..59ab4065804 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgramUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.client.program;
 
+import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.Plan;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
@@ -39,18 +40,21 @@
 public class PackagedProgramUtils {
 
/**
-* Creates a {@link JobGraph} from the given {@link PackagedProgram}.
+* Creates a {@link JobGraph} with a specified {@link JobID}
+* from the given {@link PackagedProgram}.
 *
 * @param packagedProgram to extract the JobGraph from
 * @param configuration to use for the optimizer and job graph generator
 * @param defaultParallelism for the JobGraph
+* @param jobID the pre-generated job id
 * @return JobGraph extracted from the PackagedProgram
 * @throws ProgramInvocationException if the JobGraph generation failed
 */
public static JobGraph createJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
-   int defaultParallelism) throws 
ProgramInvocationException {
+   int defaultParallelism,
+   JobID jobID) throws ProgramInvocationException {

Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
final Optimizer optimizer = new Optimizer(new DataStatistics(), 
new DefaultCostEstimator(), configuration);
final FlinkPlan flinkPlan;
@@ -79,11 +83,11 @@ public static JobGraph createJobGraph(
final JobGraph jobGraph;
 
if (flinkPlan instanceof StreamingPlan) {
-   jobGraph = ((StreamingPlan) flinkPlan).getJobGraph();
+   jobGraph = ((StreamingPlan) 
flinkPlan).getJobGraph(jobID);

jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
} else {
final JobGraphGenerator jobGraphGenerator = new 
JobGraphGenerator(configuration);
-   jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan);
+   jobGraph = 
jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
}
 
for (URL url : packagedProgram.getAllLibraries()) {
@@ -99,5 +103,22 @@ public static JobGraph createJobGraph(
return jobGraph;
}
 
+   /**
+* Creates a {@link JobGraph} with a random {@link JobID}
+* from the given {@link PackagedProgram}.
+*
+* @param packagedProgram to extract the JobGraph from
+* @param configuration to use for the optimizer and job graph generator
+* @param defaultParallelism for the JobGraph
+* @return JobGraph extracted from the PackagedProgram
+* @throws ProgramInvocationException if the JobGraph generation failed
+*/
+   public static JobGraph createJobGraph(
+   PackagedProgram packagedProgram,
+   Configuration configuration,
+   int defaultParallelism) throws ProgramInvocationException {
+   return createJobGraph(packagedProgram, configuration, 
defaultParallelism, null);
+   }
+
private PackagedProgramUtils() {}
 }
diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
index 3e0645d6859..d769b6848e7 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/ClassPathJobGraphRetriever.java
@@ -18,6 +18,7 @@
 
 pa

[GitHub] twalthr commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink

2018-10-01 Thread GitBox
twalthr commented on issue #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769#issuecomment-425850646
 
 
   It requires much than just switching to `StreamingFileSink`. Every new sink 
should integrate with the new unified API. See also 
https://github.com/apache/flink/pull/6611 for an example. The formats for 
should be pluggable and integrated with SQL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633782#comment-16633782
 ] 

ASF GitHub Bot commented on FLINK-10447:


twalthr commented on issue #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769#issuecomment-425850646
 
 
   It requires much than just switching to `StreamingFileSink`. Every new sink 
should integrate with the new unified API. See also 
https://github.com/apache/flink/pull/6611 for an example. The formats for 
should be pluggable and integrated with SQL.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Create Bucketing Table Sink.
> 
>
> Key: FLINK-10447
> URL: https://issues.apache.org/jira/browse/FLINK-10447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Suxing Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> It would be nice to integrate the table APIs with the HDFS connectors so that 
> the rows in the tables can be directly pushed into HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StefanRRichter commented on issue #6766: [docs] Improve documentation of savepoints

2018-10-01 Thread GitBox
StefanRRichter commented on issue #6766: [docs] Improve documentation of 
savepoints
URL: https://github.com/apache/flink/pull/6766#issuecomment-425851398
 
 
   Thanks for the comments! Will merge.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] fhueske commented on issue #6769: [FLINK-10447][HDFS Connector] Create Bucketing Table Sink

2018-10-01 Thread GitBox
fhueske commented on issue #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769#issuecomment-425851537
 
 
   Please see also my comment on the Jira issue. Before adapting the PR, we 
should agree on the feature scope and design. Please provide a design document 
and continue the discussion on Jira issue. 
   I think this PR should be closed because it needs to be completely rewritten.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10447) Create Bucketing Table Sink.

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633786#comment-16633786
 ] 

ASF GitHub Bot commented on FLINK-10447:


fhueske commented on issue #6769: [FLINK-10447][HDFS Connector] Create 
Bucketing Table Sink
URL: https://github.com/apache/flink/pull/6769#issuecomment-425851537
 
 
   Please see also my comment on the Jira issue. Before adapting the PR, we 
should agree on the feature scope and design. Please provide a design document 
and continue the discussion on Jira issue. 
   I think this PR should be closed because it needs to be completely rewritten.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Create Bucketing Table Sink.
> 
>
> Key: FLINK-10447
> URL: https://issues.apache.org/jira/browse/FLINK-10447
> Project: Flink
>  Issue Type: New Feature
>  Components: Table API & SQL
>Reporter: Suxing Lee
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> It would be nice to integrate the table APIs with the HDFS connectors so that 
> the rows in the tables can be directly pushed into HDFS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] asfgit closed pull request #6766: [docs] Improve documentation of savepoints

2018-10-01 Thread GitBox
asfgit closed pull request #6766: [docs] Improve documentation of savepoints
URL: https://github.com/apache/flink/pull/6766
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/ops/state/savepoints.md b/docs/ops/state/savepoints.md
index 6dd5154c5e6..f21415ff73e 100644
--- a/docs/ops/state/savepoints.md
+++ b/docs/ops/state/savepoints.md
@@ -25,17 +25,29 @@ under the License.
 * toc
 {:toc}
 
-## Overview
+## What is a Savepoint? How is a Savepoint different from a Checkpoint?
 
-Savepoints are externally stored self-contained checkpoints that you can use 
to stop-and-resume or update your Flink programs. They use Flink's 
[checkpointing mechanism]({{ site.baseurl 
}}/internals/stream_checkpointing.html) to create a (non-incremental) snapshot 
of the state of your streaming program and write the checkpoint data and meta 
data out to an external file system.
-
-This page covers all steps involved in triggering, restoring, and disposing 
savepoints.
-For more details on how Flink handles state and failures in general, check out 
the [State in Streaming Programs]({{ site.baseurl 
}}/dev/stream/state/index.html) page.
+A Savepoint is a consistent image of the execution state of a streaming job, 
created via Flink's [checkpointing mechanism]({{ site.baseurl 
}}/internals/stream_checkpointing.html). You can use Savepoints to 
stop-and-resume, fork,
+or update your Flink jobs. Savepoints consist of two parts: a directory with 
(typically large) binary files on stable storage (e.g. HDFS, S3, ...) and a 
(relatively small) meta data file. The files on stable storage represent the 
net data of the job's execution state
+image. The meta data file of a Savepoint contains (primarily) pointers to all 
file on stable storage that are part of the Savepoint, in form of absolute 
paths.
 
 
 Attention: In order to allow upgrades between programs and 
Flink versions, it is important to check out the following section about assigning IDs to your operators.
 
 
+Flink's Savepoints are different from Checkpoints in a similar way that 
backups are different from recovery logs in traditional database systems. The 
primary purpose of Checkpoints is the provide a recovery mechanism in case of
+unexpected job failures. A Checkpoint's lifecycle is managed by Flink, i.e. a 
Checkpoint is created, owned, and released by Flink - without user interaction. 
As a method of recovery and being periodically triggered, two main
+design goals for the Checkpoint implementation are i) being as lightweight to 
create and ii) being as fast to restore from as possible. Optimizations towards 
those goals can exploit certain properties, e.g. that the job code
+doesn't changes between the execution attempts. Checkpoints are usually 
dropped after the job was terminated by the user (except if explicitly 
configured as retained Checkpoints).
+
+In contrast to all this, Savepoints are created, owned, and deleted by the 
user. Their use-case is for planned, manual backup and resume. For example, 
this could be an update of your Flink version, changing your job graph,
+changing parallelism, forking a second job like for a red/blue deployment, and 
so on. Of course, Savepoints must survive job termination. Conceptually, 
Savepoints can be a bit more expensive to produce and restore and focus
+more on portability and support for the previously mentioned changes to the 
job.
+
+Those conceptual differences aside, the current implementations of Checkpoints 
and Savepoints are basically using the same code and produce the same „format". 
However, there is currently one exception from this, and we might
+introduce more differences in the future. The exception are incremental 
checkpoints with the RocksDB state backend. They are using some RocksDB 
internal format instead of Flink’s native savepoint format. This makes them the
+first instance of a more lightweight checkpointing mechanism, compared to 
Savepoints.
+
 ## Assigning Operator IDs
 
 It is **highly recommended** that you adjust your programs as described in 
this section in order to be able to upgrade your programs in the future. The 
main required change is to manually specify operator IDs via the 
**`uid(String)`** method. These IDs are used to scope the state of each 
operator.
@@ -211,4 +223,10 @@ If the savepoint was triggered with Flink >= 1.2.0 and 
using no deprecated state
 
 If you are resuming from a savepoint triggered with Flink < 1.2.0 or using now 
deprecated APIs you first have to migrate your job and savepoint to Flink >= 
1.2.0 before being able to change the parallelism. See the [upgrading jobs and 
Flink versions guide]({{ site.baseurl }}/ops/upgrading.html).
 
+### Can I move the Savepoint files on stable storage?
+
+The quick answer to this que

[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221550059
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Base class for {@link RequestBody} for running a jar or querying the plan.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class JarRequestBody implements RequestBody {
 
 Review comment:
   can be package private?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221552131
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsListQueryParameter.java
 ##
 @@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying the arguments list for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class ProgramArgsListQueryParameter extends StringQueryParameter {
+   public ProgramArgsListQueryParameter() {
+   super("program-args-list", 
MessageParameter.MessageParameterRequisiteness.OPTIONAL);
+   }
+
+   @Override
+   public String getDescription() {
+   return "List of string values that specify the arguments for 
the program or plan. " +
+   "It is recommended to specify it in a JSON request body 
as a JSON list parameter 'programArgsList'.";
 
 Review comment:
   Please remove this recommendation. You're baking in the assumption that the 
`ProgramArgsListQueryParameter ` is always used in conjunction with a 
`JarRequestBody` or similar.
   
   Deprecation notices for parameters belong into the respective 
`MessageHeader` description.
   User already receive a warning when using query parameters during the job 
submission.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221553160
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ##
 @@ -190,7 +195,8 @@ public void testConfigurationViaJsonRequest() throws 
Exception {
() -> {
final JarRunRequestBody jsonRequest = new 
JarRunRequestBody(

ParameterProgram.class.getCanonicalName(),
-   "--host localhost --port 1234",
+   null,
+   Arrays.asList("--host", "localhost", 
"--port", "1234"),
 
 Review comment:
   this test should be duplicated instead to cover both cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221551267
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
 ##
 @@ -63,4 +66,26 @@
return value;
}
 
+   /**
+* Returns {@code requestValue} if it is not null, otherwise returns 
the query parameter value
+* if it is not null, otherwise returns the default value.
+*/
+   public static  T fromRequestBodyOrQueryParameter(
+   T requestValue,
+   SupplierWithException 
queryParameterExtractor,
+   T defaultValue,
+   Logger log) throws RestHandlerException {
 
 Review comment:
   indent parameters by another tab


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221552259
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
 
 Review comment:
   add an empty line before the javadoc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221549560
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base class of {@link MessageParameters} for {@link JarRunHandler} and 
{@link JarPlanHandler}.
+ */
+abstract class JarMessageParameters extends MessageParameters {
+
+   final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+
+   final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
+
+   final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
+
+   final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
+
+   final ProgramArgsListQueryParameter programArgsListQueryParameter = new 
ProgramArgsListQueryParameter();
+
+   @Override
+   public Collection> getPathParameters() {
+   return Collections.singletonList(jarIdPathParameter);
+   }
+
+   @Override
+   public Collection> getQueryParameters() {
+   return new ArrayList<>(Arrays.asList(
 
 Review comment:
   This should continue to return an unmodifiable collection, otherwise all 
sub-classes have to wrap it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r22149
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ##
 @@ -219,15 +225,40 @@ public void testConfigurationViaJsonRequest() throws 
Exception {
}
 
@Test
-   public void testParameterPrioritization() throws Exception {
+   public void testParameterPrioritizationWithProgArgsAsString() throws 
Exception {
+   testParameterPrioritization(PROG_ARGS, null);
+   }
+
+   @Test
+   public void testParameterPrioritizationWithProgArgsAsList() throws 
Exception {
+   testParameterPrioritization(null, PROG_ARGS);
+   }
+
+   @Test
+   public void testFailIfProgArgsAreAsStringAndAsList() throws Exception {
+   try {
+   testParameterPrioritization(PROG_ARGS, PROG_ARGS);
+   fail("IllegalArgumentException is excepted");
+   } catch (IllegalArgumentException e) {
+   // expected
+   }
+   }
+
+   private void testParameterPrioritization(List programArgs, 
List programArgsList) throws Exception {
+   List args = programArgs == null ? programArgsList : 
programArgs;
 
 Review comment:
   This code is way to confusing. It strikes me as very odd that `args` is 
never null, and the naming scheme is weird (args should be "argsList", 
"argsStr" should be "args" to be somewhat consistent with the method arguments).
   
   I would pass in a single collection as arguments and a simple enum for how 
arguments should be passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221550646
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
+   public static  
List getProgramArgs(
+   HandlerRequest request, Logger log) throws 
RestHandlerException {
+   JarRequestBody requestBody = request.getRequestBody();
+   List programArgs = tokenizeArguments(
+   fromRequestBodyOrQueryParameter(
+   emptyToNull(requestBody.getProgramArguments()),
+   () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+   null,
+   log));
+   List programArgsList = fromRequestBodyOrQueryParameter(
+   requestBody.getProgramArgumentsList(),
+   () -> 
request.getQueryParameter(ProgramArgsListQueryParameter.class),
+   null,
+   log);
+   if (requestBody.getProgramArgumentsList() != null) {
 
 Review comment:
   compare against `programArgsList` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221551766
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
+   return "Deprecated, please, use 'programArgsList' in a JSON 
request body as a JSON list. " +
 
 Review comment:
   Revert this please. You're baking in the assumption that the 
`ProgramArgsQueryParameter` is always used in conjunction with a 
`JarRequestBody` or similar.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221553008
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
+   public static  
List getProgramArgs(
+   HandlerRequest request, Logger log) throws 
RestHandlerException {
+   JarRequestBody requestBody = request.getRequestBody();
+   List programArgs = tokenizeArguments(
+   fromRequestBodyOrQueryParameter(
+   emptyToNull(requestBody.getProgramArguments()),
+   () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+   null,
+   log));
+   List programArgsList = fromRequestBodyOrQueryParameter(
 
 Review comment:
   truly a minor thing, but might I suggest to move 
`fromRequestBodyOrQueryParameter(` to the next line so the indentation of this 
part is identical to the above?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221555894
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
+   public static  
List getProgramArgs(
+   HandlerRequest request, Logger log) throws 
RestHandlerException {
+   JarRequestBody requestBody = request.getRequestBody();
+   List programArgs = tokenizeArguments(
+   fromRequestBodyOrQueryParameter(
+   emptyToNull(requestBody.getProgramArguments()),
+   () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+   null,
+   log));
+   List programArgsList = fromRequestBodyOrQueryParameter(
+   requestBody.getProgramArgumentsList(),
+   () -> 
request.getQueryParameter(ProgramArgsListQueryParameter.class),
+   null,
+   log);
+   if (requestBody.getProgramArgumentsList() != null) {
+   if (!programArgs.isEmpty()) {
+   throw new IllegalArgumentException(
 
 Review comment:
   neither handler is handling this exception, so the user will get an 
INTERNAL_SERVER_ERROR instead of BAD_REQUEST.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221551094
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
 ##
 @@ -33,10 +35,11 @@
}
 
@Override
-   protected JarRunRequestBody getTestRequestInstance() throws Exception {
+   protected JarRunRequestBody getTestRequestInstance() {
return new JarRunRequestBody(
"hello",
"world",
+   Arrays.asList("foo", "bar"),
 
 Review comment:
   please use 2 words that are not present in the existing argument. `foo` and 
`bar` are already use as the savepointPath.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221549739
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
 ##
 @@ -29,32 +28,18 @@
 /**
  * {@link MessageParameters} for {@link JarRunHandler}.
  */
-public class JarRunMessageParameters extends MessageParameters {
+public class JarRunMessageParameters extends JarMessageParameters {
 
-   public final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
+   final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
 
-   public final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
-
-   public final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
-
-   public final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
-
-   public final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
-
-   public final SavepointPathQueryParameter savepointPathQueryParameter = 
new SavepointPathQueryParameter();
-
-   @Override
-   public Collection> getPathParameters() {
-   return Collections.singletonList(jarIdPathParameter);
-   }
+   final SavepointPathQueryParameter savepointPathQueryParameter = new 
SavepointPathQueryParameter();
 
@Override
public Collection> getQueryParameters() {
-   return Collections.unmodifiableCollection(Arrays.asList(
-   programArgsQueryParameter,
-   entryClassQueryParameter,
-   parallelismQueryParameter,
+   Collection> pars = 
super.getQueryParameters();
+   pars.addAll(Arrays.asList(
 
 Review comment:
   Since you're creating a new list anyway and the total size is known, add the 
super parameter to that one instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6754: [FLINK-10295] Add support of passing jar arguments as list of separate strings in REST API

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221548569
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 ##
 @@ -80,13 +79,13 @@ public JarPlanHandler(
 
@Override
protected CompletableFuture handleRequest(
-   @Nonnull final HandlerRequest request,
+   @Nonnull final HandlerRequest request,
@Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
 
final String jarId = 
request.getPathParameter(JarIdPathParameter.class);
final String entryClass = 
emptyToNull(HandlerRequestUtils.getQueryParameter(request, 
EntryClassQueryParameter.class));
 
 Review comment:
   Since the `JarPlanRequestBody` also contains this field (and is documented 
to accept it!) we should either
   a) reduce the `JarPlanRequestBody` to only contain the program arguments
   b) change this handler to also use `fromRequestBodyOrQueryParameter`
   c) exclude this handler from this change for now and handle it in a 
follow-up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633814#comment-16633814
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221551094
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBodyTest.java
 ##
 @@ -33,10 +35,11 @@
}
 
@Override
-   protected JarRunRequestBody getTestRequestInstance() throws Exception {
+   protected JarRunRequestBody getTestRequestInstance() {
return new JarRunRequestBody(
"hello",
"world",
+   Arrays.asList("foo", "bar"),
 
 Review comment:
   please use 2 words that are not present in the existing argument. `foo` and 
`bar` are already use as the savepointPath.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633808#comment-16633808
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221550059
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRequestBody.java
 ##
 @@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Base class for {@link RequestBody} for running a jar or querying the plan.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public abstract class JarRequestBody implements RequestBody {
 
 Review comment:
   can be package private?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633817#comment-16633817
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221555894
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
+   public static  
List getProgramArgs(
+   HandlerRequest request, Logger log) throws 
RestHandlerException {
+   JarRequestBody requestBody = request.getRequestBody();
+   List programArgs = tokenizeArguments(
+   fromRequestBodyOrQueryParameter(
+   emptyToNull(requestBody.getProgramArguments()),
+   () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+   null,
+   log));
+   List programArgsList = fromRequestBodyOrQueryParameter(
+   requestBody.getProgramArgumentsList(),
+   () -> 
request.getQueryParameter(ProgramArgsListQueryParameter.class),
+   null,
+   log);
+   if (requestBody.getProgramArgumentsList() != null) {
+   if (!programArgs.isEmpty()) {
+   throw new IllegalArgumentException(
 
 Review comment:
   neither handler is handling this exception, so the user will get an 
INTERNAL_SERVER_ERROR instead of BAD_REQUEST.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633811#comment-16633811
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221548569
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java
 ##
 @@ -80,13 +79,13 @@ public JarPlanHandler(
 
@Override
protected CompletableFuture handleRequest(
-   @Nonnull final HandlerRequest request,
+   @Nonnull final HandlerRequest request,
@Nonnull final RestfulGateway gateway) throws 
RestHandlerException {
 
final String jarId = 
request.getPathParameter(JarIdPathParameter.class);
final String entryClass = 
emptyToNull(HandlerRequestUtils.getQueryParameter(request, 
EntryClassQueryParameter.class));
 
 Review comment:
   Since the `JarPlanRequestBody` also contains this field (and is documented 
to accept it!) we should either
   a) reduce the `JarPlanRequestBody` to only contain the program arguments
   b) change this handler to also use `fromRequestBodyOrQueryParameter`
   c) exclude this handler from this change for now and handle it in a 
follow-up.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633812#comment-16633812
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221552131
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsListQueryParameter.java
 ##
 @@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+
+import java.io.File;
+
+/**
+ * Query parameter specifying the arguments list for the program.
+ * @see org.apache.flink.client.program.PackagedProgram#PackagedProgram(File, 
String, String...)
+ */
+public class ProgramArgsListQueryParameter extends StringQueryParameter {
+   public ProgramArgsListQueryParameter() {
+   super("program-args-list", 
MessageParameter.MessageParameterRequisiteness.OPTIONAL);
+   }
+
+   @Override
+   public String getDescription() {
+   return "List of string values that specify the arguments for 
the program or plan. " +
+   "It is recommended to specify it in a JSON request body 
as a JSON list parameter 'programArgsList'.";
 
 Review comment:
   Please remove this recommendation. You're baking in the assumption that the 
`ProgramArgsListQueryParameter ` is always used in conjunction with a 
`JarRequestBody` or similar.
   
   Deprecation notices for parameters belong into the respective 
`MessageHeader` description.
   User already receive a warning when using query parameters during the job 
submission.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633810#comment-16633810
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221551766
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/ProgramArgsQueryParameter.java
 ##
 @@ -32,6 +32,7 @@ public ProgramArgsQueryParameter() {
 
@Override
public String getDescription() {
-   return "String value that specifies the arguments for the 
program or plan.";
+   return "Deprecated, please, use 'programArgsList' in a JSON 
request body as a JSON list. " +
 
 Review comment:
   Revert this please. You're baking in the assumption that the 
`ProgramArgsQueryParameter` is always used in conjunction with a 
`JarRequestBody` or similar.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633818#comment-16633818
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r22149
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ##
 @@ -219,15 +225,40 @@ public void testConfigurationViaJsonRequest() throws 
Exception {
}
 
@Test
-   public void testParameterPrioritization() throws Exception {
+   public void testParameterPrioritizationWithProgArgsAsString() throws 
Exception {
+   testParameterPrioritization(PROG_ARGS, null);
+   }
+
+   @Test
+   public void testParameterPrioritizationWithProgArgsAsList() throws 
Exception {
+   testParameterPrioritization(null, PROG_ARGS);
+   }
+
+   @Test
+   public void testFailIfProgArgsAreAsStringAndAsList() throws Exception {
+   try {
+   testParameterPrioritization(PROG_ARGS, PROG_ARGS);
+   fail("IllegalArgumentException is excepted");
+   } catch (IllegalArgumentException e) {
+   // expected
+   }
+   }
+
+   private void testParameterPrioritization(List programArgs, 
List programArgsList) throws Exception {
+   List args = programArgs == null ? programArgsList : 
programArgs;
 
 Review comment:
   This code is way to confusing. It strikes me as very odd that `args` is 
never null, and the naming scheme is weird (args should be "argsList", 
"argsStr" should be "args" to be somewhat consistent with the method arguments).
   
   I would pass in a single collection as arguments and a simple enum for how 
arguments should be passed.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633815#comment-16633815
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221550646
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
+   public static  
List getProgramArgs(
+   HandlerRequest request, Logger log) throws 
RestHandlerException {
+   JarRequestBody requestBody = request.getRequestBody();
+   List programArgs = tokenizeArguments(
+   fromRequestBodyOrQueryParameter(
+   emptyToNull(requestBody.getProgramArguments()),
+   () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+   null,
+   log));
+   List programArgsList = fromRequestBodyOrQueryParameter(
+   requestBody.getProgramArgumentsList(),
+   () -> 
request.getQueryParameter(ProgramArgsListQueryParameter.class),
+   null,
+   log);
+   if (requestBody.getProgramArgumentsList() != null) {
 
 Review comment:
   compare against `programArgsList` instead?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633816#comment-16633816
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221553160
 
 

 ##
 File path: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ##
 @@ -190,7 +195,8 @@ public void testConfigurationViaJsonRequest() throws 
Exception {
() -> {
final JarRunRequestBody jsonRequest = new 
JarRunRequestBody(

ParameterProgram.class.getCanonicalName(),
-   "--host localhost --port 1234",
+   null,
+   Arrays.asList("--host", "localhost", 
"--port", "1234"),
 
 Review comment:
   this test should be duplicated instead to cover both cases.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633807#comment-16633807
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221551267
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/util/HandlerRequestUtils.java
 ##
 @@ -63,4 +66,26 @@
return value;
}
 
+   /**
+* Returns {@code requestValue} if it is not null, otherwise returns 
the query parameter value
+* if it is not null, otherwise returns the default value.
+*/
+   public static  T fromRequestBodyOrQueryParameter(
+   T requestValue,
+   SupplierWithException 
queryParameterExtractor,
+   T defaultValue,
+   Logger log) throws RestHandlerException {
 
 Review comment:
   indent parameters by another tab


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633813#comment-16633813
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221553008
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
+   public static  
List getProgramArgs(
+   HandlerRequest request, Logger log) throws 
RestHandlerException {
+   JarRequestBody requestBody = request.getRequestBody();
+   List programArgs = tokenizeArguments(
+   fromRequestBodyOrQueryParameter(
+   emptyToNull(requestBody.getProgramArguments()),
+   () -> getQueryParameter(request, 
ProgramArgsQueryParameter.class),
+   null,
+   log));
+   List programArgsList = fromRequestBodyOrQueryParameter(
 
 Review comment:
   truly a minor thing, but might I suggest to move 
`fromRequestBodyOrQueryParameter(` to the next line so the indentation of this 
part is identical to the above?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633806#comment-16633806
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221549739
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunMessageParameters.java
 ##
 @@ -29,32 +28,18 @@
 /**
  * {@link MessageParameters} for {@link JarRunHandler}.
  */
-public class JarRunMessageParameters extends MessageParameters {
+public class JarRunMessageParameters extends JarMessageParameters {
 
-   public final JarIdPathParameter jarIdPathParameter = new 
JarIdPathParameter();
+   final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
 
-   public final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
-
-   public final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
-
-   public final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
-
-   public final AllowNonRestoredStateQueryParameter 
allowNonRestoredStateQueryParameter = new AllowNonRestoredStateQueryParameter();
-
-   public final SavepointPathQueryParameter savepointPathQueryParameter = 
new SavepointPathQueryParameter();
-
-   @Override
-   public Collection> getPathParameters() {
-   return Collections.singletonList(jarIdPathParameter);
-   }
+   final SavepointPathQueryParameter savepointPathQueryParameter = new 
SavepointPathQueryParameter();
 
@Override
public Collection> getQueryParameters() {
-   return Collections.unmodifiableCollection(Arrays.asList(
-   programArgsQueryParameter,
-   entryClassQueryParameter,
-   parallelismQueryParameter,
+   Collection> pars = 
super.getQueryParameters();
+   pars.addAll(Arrays.asList(
 
 Review comment:
   Since you're creating a new list anyway and the total size is known, add the 
super parameter to that one instead.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633805#comment-16633805
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221549560
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarMessageParameters.java
 ##
 @@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.MessageParameters;
+import org.apache.flink.runtime.rest.messages.MessagePathParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * Base class of {@link MessageParameters} for {@link JarRunHandler} and 
{@link JarPlanHandler}.
+ */
+abstract class JarMessageParameters extends MessageParameters {
+
+   final JarIdPathParameter jarIdPathParameter = new JarIdPathParameter();
+
+   final EntryClassQueryParameter entryClassQueryParameter = new 
EntryClassQueryParameter();
+
+   final ParallelismQueryParameter parallelismQueryParameter = new 
ParallelismQueryParameter();
+
+   final ProgramArgsQueryParameter programArgsQueryParameter = new 
ProgramArgsQueryParameter();
+
+   final ProgramArgsListQueryParameter programArgsListQueryParameter = new 
ProgramArgsListQueryParameter();
+
+   @Override
+   public Collection> getPathParameters() {
+   return Collections.singletonList(jarIdPathParameter);
+   }
+
+   @Override
+   public Collection> getQueryParameters() {
+   return new ArrayList<>(Arrays.asList(
 
 Review comment:
   This should continue to return an unmodifiable collection, otherwise all 
sub-classes have to wrap it again.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10295) Tokenisation of Program Args resulting in unexpected results

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633809#comment-16633809
 ] 

ASF GitHub Bot commented on FLINK-10295:


zentol commented on a change in pull request #6754: [FLINK-10295] Add support 
of passing jar arguments as list of separate strings in REST API
URL: https://github.com/apache/flink/pull/6754#discussion_r221552259
 
 

 ##
 File path: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java
 ##
 @@ -26,13 +36,42 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.fromRequestBodyOrQueryParameter;
+import static 
org.apache.flink.runtime.rest.handler.util.HandlerRequestUtils.getQueryParameter;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
 /**
  * Utils for jar handlers.
  *
  * @see org.apache.flink.runtime.webmonitor.handlers.JarRunHandler
  * @see org.apache.flink.runtime.webmonitor.handlers.JarPlanHandler
  */
 public class JarHandlerUtils {
+   /** Parse program arguments in jar run or plan request. */
 
 Review comment:
   add an empty line before the javadoc


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Tokenisation of Program Args resulting in unexpected results
> 
>
> Key: FLINK-10295
> URL: https://issues.apache.org/jira/browse/FLINK-10295
> Project: Flink
>  Issue Type: Bug
>  Components: REST, Webfrontend
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Gaurav Singhania
>Assignee: Andrey Zagrebin
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.7.0
>
> Attachments: sample_request.txt
>
>
> We were upgrading from Flink 1.4 to 1.6. At present we have a jar which takes 
> all the details to run the job as program args against a jarid, including sql 
> query and kafka details. In version 1.5 the program args are tokenised as a 
> result single quote (') and double quote(") are stripped from the arguments. 
> This results in malformed args.
> Attached a sample request for reference.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for SpillReadBufferPool

2018-10-01 Thread GitBox
NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for 
SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c235999db91..48b9a20e9da 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int 
size, Object owner) {
return new HybridMemorySegment(new byte[size], owner);
}
 
+   /**
+* Allocates some unpooled off-heap memory and creates a new memory 
segment that
+* represents that memory.
+*
+* @param size The size of the off-heap memory segment to allocate.
+* @param owner The owner to associate with the off-heap memory segment.
+* @return A new memory segment, backed by unpooled off-heap memory.
+*/
+   public static MemorySegment allocateUnpooledOffHeapMemory(int size, 
Object owner) {
+   ByteBuffer memory = ByteBuffer.allocateDirect(size);
+   return wrapPooledOffHeapMemory(memory, owner);
+   }
+
/**
 * Creates a memory segment that wraps the given byte array.
 *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a369ce5a5fb..1fddb612781 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -30,7 +30,6 @@
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize) {
 
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
-   ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-   
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory,
 null));
+   
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize,
 null));
}
}
catch (OutOfMemoryError err) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 2a6a71f05d6..f941e20846e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -257,7 +257,8 @@ public String toString() {
 
synchronized (buffers) {
for (int i = 0; i < numberOfBuffers; i++) {
-   buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), 
this));
+   buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory(
+   memorySegmentSize, null), 
this));
}
}
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index c450880f98b..d1a304a4909 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -675,8 +675,7 @@ void clear() {
 
@Override
MemorySegment allocateNewSegment(Object owner) {
-   ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-   return 
MemorySegmentFactory.wrapPooledOffHeapMemory(memory, owner);
+   return 
MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize, owner);
}
 
@Override


 


This 

[jira] [Commented] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633823#comment-16633823
 ] 

ASF GitHub Bot commented on FLINK-10339:


NicoK closed pull request #6762: [FLINK-10339][network] Use off-heap memory for 
SpillReadBufferPool
URL: https://github.com/apache/flink/pull/6762
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
index c235999db91..48b9a20e9da 100644
--- 
a/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
+++ 
b/flink-core/src/main/java/org/apache/flink/core/memory/MemorySegmentFactory.java
@@ -74,6 +74,19 @@ public static MemorySegment allocateUnpooledSegment(int 
size, Object owner) {
return new HybridMemorySegment(new byte[size], owner);
}
 
+   /**
+* Allocates some unpooled off-heap memory and creates a new memory 
segment that
+* represents that memory.
+*
+* @param size The size of the off-heap memory segment to allocate.
+* @param owner The owner to associate with the off-heap memory segment.
+* @return A new memory segment, backed by unpooled off-heap memory.
+*/
+   public static MemorySegment allocateUnpooledOffHeapMemory(int size, 
Object owner) {
+   ByteBuffer memory = ByteBuffer.allocateDirect(size);
+   return wrapPooledOffHeapMemory(memory, owner);
+   }
+
/**
 * Creates a memory segment that wraps the given byte array.
 *
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
index a369ce5a5fb..1fddb612781 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java
@@ -30,7 +30,6 @@
 import javax.annotation.Nullable;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -89,8 +88,7 @@ public NetworkBufferPool(int numberOfSegmentsToAllocate, int 
segmentSize) {
 
try {
for (int i = 0; i < numberOfSegmentsToAllocate; i++) {
-   ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-   
availableMemorySegments.add(MemorySegmentFactory.wrapPooledOffHeapMemory(memory,
 null));
+   
availableMemorySegments.add(MemorySegmentFactory.allocateUnpooledOffHeapMemory(segmentSize,
 null));
}
}
catch (OutOfMemoryError err) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
index 2a6a71f05d6..f941e20846e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/SpilledSubpartitionView.java
@@ -257,7 +257,8 @@ public String toString() {
 
synchronized (buffers) {
for (int i = 0; i < numberOfBuffers; i++) {
-   buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledSegment(memorySegmentSize), 
this));
+   buffers.add(new 
NetworkBuffer(MemorySegmentFactory.allocateUnpooledOffHeapMemory(
+   memorySegmentSize, null), 
this));
}
}
}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
index c450880f98b..d1a304a4909 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/memory/MemoryManager.java
@@ -675,8 +675,7 @@ void clear() {
 
@Override
MemorySegment allocateNewSegment(Object owner) {
-   ByteBuffer memory = 
ByteBuffer.allocateDirect(segmentSize);
-   return 
MemorySegmentFactory.wrapP

[jira] [Resolved] (FLINK-10339) SpillReadBufferPool cannot use off-heap memory

2018-10-01 Thread Nico Kruber (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10339?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber resolved FLINK-10339.
-
   Resolution: Fixed
Fix Version/s: 1.7.0

Fixed via:
- master: 2ab55df946dc35280f66c6f073a112b497acede3

> SpillReadBufferPool cannot use off-heap memory
> --
>
> Key: FLINK-10339
> URL: https://issues.apache.org/jira/browse/FLINK-10339
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.7.0
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>  Labels: pull-request-available
> Fix For: 1.7.0
>
>
> Currently, the {{NetworkBufferPool}} always uses off-heap memory to reduce 
> memory copy from flink {{Buffer}} to netty internal {{ByteBuf}} during 
> transporting on sender side.
>  
> But for {{SpillReadBufferPool}} in {{SpilledSubpartitionView}}, it still uses 
> heap memory for caching. We can make it as off-heap by default similar with 
> {{NetworkBufferPool}} or decide the type by the current parameter 
> {{taskmanager.memory.off-heap.}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] dawidwys commented on issue #6781: [FLINK-10470] Add method to check if pattern can produce empty matches

2018-10-01 Thread GitBox
dawidwys commented on issue #6781: [FLINK-10470] Add method to check if pattern 
can produce empty matches
URL: https://github.com/apache/flink/pull/6781#issuecomment-425868325
 
 
   cc @kl0u @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633854#comment-16633854
 ] 

ASF GitHub Bot commented on FLINK-10470:


dawidwys commented on issue #6781: [FLINK-10470] Add method to check if pattern 
can produce empty matches
URL: https://github.com/apache/flink/pull/6781#issuecomment-425868325
 
 
   cc @kl0u @pnowojski 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add method to check if pattern can produce empty matches
> 
>
> Key: FLINK-10470
> URL: https://issues.apache.org/jira/browse/FLINK-10470
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> There is couple of inconsistencies how CEP library handles greedy and 
> reluctant operators at the beginning at end of pattern. This results in 
> subtle problems how empty matches should be generated for patterns like e.g. 
> A? or A*?, where one is greedy and the other one is reluctant. In order to 
> provide first version of MATCH_RECOGNIZE function we should have a 
> possibility to disable patterns which can produce empty matches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r221573282
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.fail;
+
+/**
+ * The test class for metrics report by job manager.
+ */
+public class JobManagerMetricsTest {
 
 Review comment:
   This isn't testing anything.
   
   What we want to ensure is that these metrics don't disappear again, and 
optionally that the values are correct. For this you have to actually start 
runtime components, which could be done with a `MiniCluster` (that is also 
configured to use the TestReporter) and have the reporter search for the 
desired metrics.
   The `SystemResourcesMetricsITCase` may be a useful reference


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6702: [FLINK-10135] The JobManager does not report the cluster-level metrics

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r221573282
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.fail;
+
+/**
+ * The test class for metrics report by job manager.
+ */
+public class JobManagerMetricsTest {
 
 Review comment:
   This isn't testing anything.
   
   What we want to ensure is that these metrics don't disappear again, and 
optionally that the values are correct. For this you have to actually start 
runtime components, which could be done with a `MiniCluster` (that is also 
configured to use the TestReporter) and have the reporter search for the 
desired metrics.
   The `SystemResourcesMetricsITCase` may be a useful reference.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633862#comment-16633862
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r221573282
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.fail;
+
+/**
+ * The test class for metrics report by job manager.
+ */
+public class JobManagerMetricsTest {
 
 Review comment:
   This isn't testing anything.
   
   What we want to ensure is that these metrics don't disappear again, and 
optionally that the values are correct. For this you have to actually start 
runtime components, which could be done with a `MiniCluster` (that is also 
configured to use the TestReporter) and have the reporter search for the 
desired metrics.
   The `SystemResourcesMetricsITCase` may be a useful reference


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10135) The JobManager doesn't report the cluster-level metrics

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10135?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633863#comment-16633863
 ] 

ASF GitHub Bot commented on FLINK-10135:


zentol commented on a change in pull request #6702: [FLINK-10135] The 
JobManager does not report the cluster-level metrics
URL: https://github.com/apache/flink/pull/6702#discussion_r221573282
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/metrics/JobManagerMetricsTest.java
 ##
 @@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.metrics;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.Gauge;
+import org.apache.flink.metrics.MetricConfig;
+import org.apache.flink.metrics.reporter.AbstractReporter;
+import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static org.junit.Assert.fail;
+
+/**
+ * The test class for metrics report by job manager.
+ */
+public class JobManagerMetricsTest {
 
 Review comment:
   This isn't testing anything.
   
   What we want to ensure is that these metrics don't disappear again, and 
optionally that the values are correct. For this you have to actually start 
runtime components, which could be done with a `MiniCluster` (that is also 
configured to use the TestReporter) and have the reporter search for the 
desired metrics.
   The `SystemResourcesMetricsITCase` may be a useful reference.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> The JobManager doesn't report the cluster-level metrics
> ---
>
> Key: FLINK-10135
> URL: https://issues.apache.org/jira/browse/FLINK-10135
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Metrics
>Affects Versions: 1.5.0, 1.6.0, 1.7.0
>Reporter: Joey Echeverria
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
>
> In [the documentation for 
> metrics|https://ci.apache.org/projects/flink/flink-docs-release-1.5/monitoring/metrics.html#cluster]
>  in the Flink 1.5.0 release, it says that the following metrics are reported 
> by the JobManager:
> {noformat}
> numRegisteredTaskManagers
> numRunningJobs
> taskSlotsAvailable
> taskSlotsTotal
> {noformat}
> In the job manager REST endpoint 
> ({{http://:8081/jobmanager/metrics}}), those metrics don't 
> appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches

2018-10-01 Thread GitBox
pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method 
to check if pattern can produce empty matches
URL: https://github.com/apache/flink/pull/6781#discussion_r221576741
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ##
 @@ -74,6 +77,36 @@
}
}
 
+   /**
+* Verifies if the provided pattern can possibly generate empty match. 
Example of patterns that can possibly
+* generate empty matches are: A*, A?, A* B? etc.
+*
+* @param pattern pattern to check
+* @return true if empty match could potentially match the pattern, 
false otherwise
+*/
+   public static boolean canProduceEmptyMatches(final Pattern 
pattern) {
+   NFAFactoryCompiler compiler = new 
NFAFactoryCompiler<>(checkNotNull(pattern));
+   compiler.compileFactory();
+   State startState = 
compiler.getStates().stream().filter(State::isStart).findFirst().get();
 
 Review comment:
   nit: `orElseThrow(IllegalStateException::new)` (or sth like this, I'm not 
sure what's the correct syntax) instead of `get()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method to check if pattern can produce empty matches

2018-10-01 Thread GitBox
pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method 
to check if pattern can produce empty matches
URL: https://github.com/apache/flink/pull/6781#discussion_r221577049
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ##
 @@ -74,6 +77,36 @@
}
}
 
+   /**
+* Verifies if the provided pattern can possibly generate empty match. 
Example of patterns that can possibly
+* generate empty matches are: A*, A?, A* B? etc.
+*
+* @param pattern pattern to check
+* @return true if empty match could potentially match the pattern, 
false otherwise
+*/
+   public static boolean canProduceEmptyMatches(final Pattern 
pattern) {
+   NFAFactoryCompiler compiler = new 
NFAFactoryCompiler<>(checkNotNull(pattern));
+   compiler.compileFactory();
+   State startState = 
compiler.getStates().stream().filter(State::isStart).findFirst().get();
+
+   final Stack> statesToCheck = new Stack<>();
+   statesToCheck.push(startState);
+   while (!statesToCheck.isEmpty()) {
+   final State currentState = statesToCheck.pop();
+   for (StateTransition transition : 
currentState.getStateTransitions()) {
+   if (transition.getAction() == 
StateTransitionAction.PROCEED) {
+   if 
(transition.getTargetState().isFinal()) {
+   return true;
+   } else {
+   
statesToCheck.push(transition.getTargetState());
 
 Review comment:
   Can we loop here forever if `PROCEED` takes us to already visited node? 
Shouldn't we check for already visited states? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633874#comment-16633874
 ] 

ASF GitHub Bot commented on FLINK-10470:


pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method 
to check if pattern can produce empty matches
URL: https://github.com/apache/flink/pull/6781#discussion_r221577049
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ##
 @@ -74,6 +77,36 @@
}
}
 
+   /**
+* Verifies if the provided pattern can possibly generate empty match. 
Example of patterns that can possibly
+* generate empty matches are: A*, A?, A* B? etc.
+*
+* @param pattern pattern to check
+* @return true if empty match could potentially match the pattern, 
false otherwise
+*/
+   public static boolean canProduceEmptyMatches(final Pattern 
pattern) {
+   NFAFactoryCompiler compiler = new 
NFAFactoryCompiler<>(checkNotNull(pattern));
+   compiler.compileFactory();
+   State startState = 
compiler.getStates().stream().filter(State::isStart).findFirst().get();
+
+   final Stack> statesToCheck = new Stack<>();
+   statesToCheck.push(startState);
+   while (!statesToCheck.isEmpty()) {
+   final State currentState = statesToCheck.pop();
+   for (StateTransition transition : 
currentState.getStateTransitions()) {
+   if (transition.getAction() == 
StateTransitionAction.PROCEED) {
+   if 
(transition.getTargetState().isFinal()) {
+   return true;
+   } else {
+   
statesToCheck.push(transition.getTargetState());
 
 Review comment:
   Can we loop here forever if `PROCEED` takes us to already visited node? 
Shouldn't we check for already visited states? 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add method to check if pattern can produce empty matches
> 
>
> Key: FLINK-10470
> URL: https://issues.apache.org/jira/browse/FLINK-10470
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> There is couple of inconsistencies how CEP library handles greedy and 
> reluctant operators at the beginning at end of pattern. This results in 
> subtle problems how empty matches should be generated for patterns like e.g. 
> A? or A*?, where one is greedy and the other one is reluctant. In order to 
> provide first version of MATCH_RECOGNIZE function we should have a 
> possibility to disable patterns which can produce empty matches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

2018-10-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reopened FLINK-10291:
---

> Generate JobGraph with fixed/configurable JobID in 
> StandaloneJobClusterEntrypoint
> -
>
> Key: FLINK-10291
> URL: https://issues.apache.org/jira/browse/FLINK-10291
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} 
> from the user code when being started. Due to the nature of how the 
> {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is 
> problematic in case of a failover because then, the {{JobMaster}} won't be 
> able to detect the checkpoints. In order to solve this problem, we need to 
> either fix the {{JobID}} assignment or make it configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

2018-10-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-10291.
---
Resolution: Fixed

Fixed via
1.7.0: c839b1a04c389db816fad0194eb63a5220ee5351
1.6.2: 3ee66af6b1c42eda07d0a28d71560cdd3b4dba33

> Generate JobGraph with fixed/configurable JobID in 
> StandaloneJobClusterEntrypoint
> -
>
> Key: FLINK-10291
> URL: https://issues.apache.org/jira/browse/FLINK-10291
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} 
> from the user code when being started. Due to the nature of how the 
> {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is 
> problematic in case of a failover because then, the {{JobMaster}} won't be 
> able to detect the checkpoints. In order to solve this problem, we need to 
> either fix the {{JobID}} assignment or make it configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10470) Add method to check if pattern can produce empty matches

2018-10-01 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10470?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633875#comment-16633875
 ] 

ASF GitHub Bot commented on FLINK-10470:


pnowojski commented on a change in pull request #6781: [FLINK-10470] Add method 
to check if pattern can produce empty matches
URL: https://github.com/apache/flink/pull/6781#discussion_r221576741
 
 

 ##
 File path: 
flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/compiler/NFACompiler.java
 ##
 @@ -74,6 +77,36 @@
}
}
 
+   /**
+* Verifies if the provided pattern can possibly generate empty match. 
Example of patterns that can possibly
+* generate empty matches are: A*, A?, A* B? etc.
+*
+* @param pattern pattern to check
+* @return true if empty match could potentially match the pattern, 
false otherwise
+*/
+   public static boolean canProduceEmptyMatches(final Pattern 
pattern) {
+   NFAFactoryCompiler compiler = new 
NFAFactoryCompiler<>(checkNotNull(pattern));
+   compiler.compileFactory();
+   State startState = 
compiler.getStates().stream().filter(State::isStart).findFirst().get();
 
 Review comment:
   nit: `orElseThrow(IllegalStateException::new)` (or sth like this, I'm not 
sure what's the correct syntax) instead of `get()`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add method to check if pattern can produce empty matches
> 
>
> Key: FLINK-10470
> URL: https://issues.apache.org/jira/browse/FLINK-10470
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>  Labels: pull-request-available
>
> There is couple of inconsistencies how CEP library handles greedy and 
> reluctant operators at the beginning at end of pattern. This results in 
> subtle problems how empty matches should be generated for patterns like e.g. 
> A? or A*?, where one is greedy and the other one is reluctant. In order to 
> provide first version of MATCH_RECOGNIZE function we should have a 
> possibility to disable patterns which can produce empty matches.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10291) Generate JobGraph with fixed/configurable JobID in StandaloneJobClusterEntrypoint

2018-10-01 Thread Till Rohrmann (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10291?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-10291.
-
  Resolution: Fixed
Release Note: The {{StandaloneJobClusterEntrypoint}} now starts all jobs 
with a fixed {{JobID}}. Thus, in order to run a cluster in HA mode, one needs 
to set a different cluster id {{high-availability.cluster-id}} for each 
job/cluster.

> Generate JobGraph with fixed/configurable JobID in 
> StandaloneJobClusterEntrypoint
> -
>
> Key: FLINK-10291
> URL: https://issues.apache.org/jira/browse/FLINK-10291
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.6.0, 1.7.0
>Reporter: Till Rohrmann
>Assignee: vinoyang
>Priority: Critical
>  Labels: pull-request-available
> Fix For: 1.7.0, 1.6.2
>
>
> The {{StandaloneJobClusterEntrypoint}} currently generates the {{JobGraph}} 
> from the user code when being started. Due to the nature of how the 
> {{JobGraph}} is generated, it will get a random {{JobID}} assigned. This is 
> problematic in case of a failover because then, the {{JobMaster}} won't be 
> able to detect the checkpoints. In order to solve this problem, we need to 
> either fix the {{JobID}} assignment or make it configurable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10462) Remove ConnectionIndex for further sharing tcp connection in credit-based mode

2018-10-01 Thread Nico Kruber (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10462?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16633879#comment-16633879
 ] 

Nico Kruber commented on FLINK-10462:
-

Yes, you are right about the changes that credit-based flow control allows 
here. I have been thinking about this as well and have also been asked at Flink 
Forward, but I have mixed feelings: sometimes it is actually beneficial to have 
multiple TCP connections if, for example, network is the bottleneck. Only 
through multiple connections, you will be able to saturate a connection, 
judging from various network hardware tests I have read in the past. This 
especially happens the more throughput the interface offers.

Have you made different observations?

On the other hand, opening too many connections (happens easily for large 
topologies) may also become a problem. How about a more generic connection pool 
with configurable size? This wouldn't be an easy patch as the ticket described 
here and we'd also only need to look into this once we really have the problem 
with too many connections, or do you see any other advantages of changing the 
behaviour here?

> Remove ConnectionIndex for further sharing tcp connection in credit-based 
> mode 
> ---
>
> Key: FLINK-10462
> URL: https://issues.apache.org/jira/browse/FLINK-10462
> Project: Flink
>  Issue Type: Improvement
>  Components: Network
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.5.3, 1.6.0, 1.6.1, 1.5.4
>Reporter: zhijiang
>Assignee: zhijiang
>Priority: Minor
>
> Every {{IntermediateResult}} generates a random {{ConnectionIndex}} which 
> will be included in {{ConnectionID}}.
> The {{RemoteInputChannel}} requests to establish tcp connection via 
> {{ConnectionID}}. That means one tcp connection may be shared by multiple 
> {{RemoteInputChannel}} {{s which have the same ConnectionID}}. To do so, we 
> can reduce the physical connections between two \{{TaskManager}} s, and it 
> brings benefits for large scale jobs. 
> But this sharing is limited only for the same {{IntermediateResult}}, and I 
> think it is mainly because we may temporarily switch off {{autoread}} for the 
> channel during back pressure in previous network flow control. For 
> credit-based mode, the channel is always open for transporting different 
> intermediate data, so we can further share the tcp connection for different 
> {{IntermediateResults}} to remove the limit. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6735: [FLINK-9126] New CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo

2018-10-01 Thread GitBox
zentol commented on a change in pull request #6735: [FLINK-9126] New 
CassandraPojoInputFormat to output data as a custom annotated Cassandra Pojo
URL: https://github.com/apache/flink/pull/6735#discussion_r221575216
 
 

 ##
 File path: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/batch/connectors/cassandra/CustomCassandraAnnotatedPojo.java
 ##
 @@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import com.datastax.driver.mapping.annotations.Column;
+import com.datastax.driver.mapping.annotations.Table;
+
+/**
+ * Example of Cassandra Annotated POJO class for use with {@link 
CassandraPojoInputFormat}.
+ */
+@Table(name = "batches", keyspace = "flink")
+public class CustomCassandraAnnotatedPojo {
+
+
 
 Review comment:
   double empty-lines should be prohibited by checkstyle, don't know why this 
isn't failing :/


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


  1   2   3   >