[GitHub] flink issue #3124: [FLINK-5281] Extend KafkaJsonTableSources to support nest...

2018-07-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3124
  
This PR can be closed. PR #5491 added a TableSource that supports nested 
JSON data.
Thanks, Fabian


---


[GitHub] flink issue #3609: [FLINK-6073] - Support for SQL inner queries for proctime

2018-07-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3609
  
This PR can be closed as it will be addressed by FLINK-9714.
Thanks, Fabian


---


[GitHub] flink issue #6341: [FLINK-5750] Incorrect translation of n-ary Union

2018-07-24 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6341
  
Thanks for the fix @AlexanderKoltsov! 
I'll merge this.


---


[GitHub] flink pull request #6393: [FLINK-9296] [table] Add support for non-windowed ...

2018-07-23 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/6393

[FLINK-9296] [table] Add support for non-windowed DISTINCT aggregates.

## What is the purpose of the change

- Fix a regression from 1.5 that was caused by rearranging optimization 
rules.
- Add tests to ensure the feature won't be broken again.

## Brief change log

- Remove DISTINCT limitation on `DataStreamAggregateRule`.
- Add plan test to ensure queries are correctly translated
- Add ITCase to ensure queries are correctly executed
- Remove outdated limitation from documentation

## Verifying this change

- Run the added tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink table-distinctGroupBy

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6393.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6393






---


[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6267#discussion_r200478425
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -36,22 +39,21 @@ import scala.collection.JavaConverters._
 class DataSetUnion(
--- End diff --

We need the same fix for `DataStreamUnion`


---


[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6267#discussion_r200478336
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -36,22 +39,21 @@ import scala.collection.JavaConverters._
 class DataSetUnion(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-leftNode: RelNode,
-rightNode: RelNode,
-rowRelDataType: RelDataType)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
+inputs: JList[RelNode],
+rowRelDataType: RelDataType,
+all: Boolean)
+  extends Union(cluster, traitSet, inputs, all)
--- End diff --

Change to `Union(cluster, traitSet, inputs, true)`


---


[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6267#discussion_r200480534
  
--- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java
 ---
@@ -73,6 +73,30 @@ public void testValues() throws Exception {
compareResultAsText(results, expected);
}
 
+   @Test
+   public void testValuesWithCast() throws Exception {
--- End diff --

Can you move this test to 
`org.apache.flink.table.runtime.batch.sql.SetOperatorsITCase` and also add one 
to `org.apache.flink.table.runtime.stream.sql.SetOperatorsITCase`?

In addition it would be good to have to plan tests for this query in 
`org.apache.flink.table.api.batch.sql.SetOperatorsTest` and 
`org.apache.flink.table.api.stream.sql.SetOperatorsTest`.


---


[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6267#discussion_r200478263
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala
 ---
@@ -36,22 +39,21 @@ import scala.collection.JavaConverters._
 class DataSetUnion(
 cluster: RelOptCluster,
 traitSet: RelTraitSet,
-leftNode: RelNode,
-rightNode: RelNode,
-rowRelDataType: RelDataType)
-  extends BiRel(cluster, traitSet, leftNode, rightNode)
+inputs: JList[RelNode],
+rowRelDataType: RelDataType,
+all: Boolean)
--- End diff --

we don't need the `all` parameter because `DataStreamUnion` only supports 
`UNION ALL` semantics.


---


[GitHub] flink issue #6161: [hotfix] [docs][FLINK-9581] Typo: extra spaces removed to...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6161
  
Documentation fixes are usually not critical to include in a release 
because the docs are always built from the most recent release branch. So also 
documentation changes that are not included in a release will be published 
shortly after being committed.

I'll merge this PR.

Btw. it is OK to create a hotfix (i.e., a PR without creating a JIRA issue) 
for minor fixes like this.

Thanks, Fabian


---


[GitHub] flink issue #6255: [FLINK-9681] [table] Make sure difference between minRete...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6255
  
Thanks for the update @hequn8128.

I'll merge this


---


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Expose Expression.resultTyp...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Thanks for the update @HeartSaVioR.

I'll merge this


---


[GitHub] flink issue #6253: [FLINK-8094][Table API & SQL] Support other types for Exi...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6253
  
Thanks for the update @HeartSaVioR.

I'll merge this


---


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Expose Expression.resultTyp...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Looks good. Just one last comment.


---


[GitHub] flink pull request #6252: [FLINK-9742][Table API & SQL] Expose Expression.re...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6252#discussion_r200345063
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala
 ---
@@ -22,13 +22,16 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, 
Double => JDouble, Float =
 import java.math.{BigDecimal => JBigDecimal}
 import java.sql.{Date, Time, Timestamp}
 
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo
+import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, 
TypeInformation}
 import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime}
 import org.apache.flink.table.api.ValidationException
 import org.apache.flink.table.calcite.FlinkTypeFactory
 import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, 
TimeIntervalTypeInfo}
 
 object ExpressionUtils {
+  def getReturnType(expr: Expression): TypeInformation[_] = {
--- End diff --

Please add Scala docs since this is a public method.

Rename to `getResultType` for consistency?




---


[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6255#discussion_r200333630
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala
 ---
@@ -384,4 +386,12 @@ object HarnessTestBase {
   value.row.getField(selectorField).asInstanceOf[T]
 }
   }
+
+  /**
+* Test class used to test min and max retention time.
+*/
+  class StreamQueryConfigTest(min: Time, max: Time) extends 
StreamQueryConfig {
--- End diff --

I would rename the class to `TestStreamQueryConfig` because the `Test` at 
the end suggests that this class is testing something instead of being a util 
for a test.


---


[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...

2018-07-05 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6255#discussion_r200330683
  
--- Diff: docs/dev/table/streaming.md ---
@@ -591,16 +589,14 @@ qConfig.withIdleStateRetentionTime(Time.hours(12);
 
 val qConfig: StreamQueryConfig = ???
 
-// set idle state retention time: min = 12 hour, max = 16 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16))
-// set idle state retention time. min = max = 12 hours
-qConfig.withIdleStateRetentionTime(Time.hours(12)
+// set idle state retention time: min = 12 hour, max = 24 hours
+qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))
 
 {% endhighlight %}
 
 
 
-Configuring different minimum and maximum idle state retention times is 
more efficient because it reduces the internal book-keeping of a query for when 
to remove state.
+Configuring different minimum and maximum idle state retention times is 
more efficient because it reduces the internal book-keeping of a query for when 
to remove state. Difference between minTime and maxTime shoud be at least 5 
minutes.
--- End diff --

The "... more efficient ..." does not apply anymore. Maybe rephrase to 

> Cleaning up state requires additional bookkeeping which becomes less 
expensive for larger differences of `minTime` and `maxTime`. The difference 
between `minTime` and `maxTime` must be at least 5 minutes.




---


[GitHub] flink issue #6253: [WIP][FLINK-8094][Table API & SQL] Support other types fo...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6253
  
It's a good question whether to add a new class or not. Right now, 
extending the implementation of `ExistingField` seems like a better approach to 
me. Once we add a `ParsingExistingField` extractor that can be configured with 
a timestamp format, the ISO date String support in `ExistingField` would be 
obsolete, but IMO that's not a big problem. 

+1 to change the implementation to extend `ExistingField`.

Regarding the tests, I think `ExistingField` is used in a few ITCases, but 
there are no unit tests yet.
Adding unit tests is a bit tricky, because we would need to integrate it 
with the code generator, etc.
So, a big +1 if you would like to look into that, but I'd also be fine by 
adding another ITCase.

For the documentation, we might want to extend the bullet point about 
`timestampExtractor` in the [Defining a Rowtime 
Attribute](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#defining-a-rowtime-attribute)
 section.

Thanks, Fabian


---


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
I think it can also be a new public method in 
`org.apache.flink.table.expressions.ExpressionUtils`.

Thanks, Fabian


---


[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...

2018-07-05 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6252
  
Thanks for opening this PR @HeartSaVioR! (cc @twalthr)

I thought about this again and I don't think we should make the method 
public. The problem is that `Expression` is one of the core classes of the 
Table API. By making the method public it becomes visible to all users of the 
API.

A better approach is to provide a publicly accessible util object (in 
org.apache.flink.table.api...) that provides access to the result type (and 
possibly other properties) of ab `Expression`.

What do you think? 

Best, Fabian


---


[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-04 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6188
  
Hmm, I think there is still a case for `timestampAdd`. The problem are time 
intervals with variable length such as `MONTH`, `QUARTER`, and `YEAR`. All of 
these cannot be defined by milliseconds, because they depend on the context. So 
`timstampAdd(ts, 1.month)` returns a different result than `ts + 1.month`.


---


[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...

2018-07-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6201
  
Hi @suez1224, that sounds good overall. :-)

A few comments:

- I would not add a user-facing property `connector.support-timestamp` 
because a user chooses that by choosing the connector type. Whether the 
connector supports writing a system timestamp can be an internal 
field/annotation/interface of the `TableSink` that is generated from the 
properties.
- Copying the timestamp to the StreamRecord timestamp field can be done 
with a process function. Actually, we do that already when converting a Table 
into a DataStream. Setting the flag in the Kafka TableSink should be easy.
- Not sure if `from-source` needs to be supported by the initial version. 
We could just implement `from-field` for now, and handle `from-source` as a 
follow up issue. Since we are approaching feature freeze, I think this might be 
a good idea at this point.

What do you think?
Fabian


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199743600
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -2184,6 +2184,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.atan2(NUMERIC)
--- End diff --

given that both parameters are equally important, we might want to change 
the syntax to `atan2(Numeric, Numeric)`. IMO, that would be more intuitive. 
What do you think?


---


[GitHub] flink issue #6161: [hotfix] [docs][FLINK-9581] Typo: extra spaces removed to...

2018-07-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6161
  
Thanks @snuyanzin!
+1 to merge


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199500643
  
--- Diff: docs/dev/table/sql.md ---
@@ -1510,6 +1510,17 @@ ATAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+ATAN2(numeric)
--- End diff --

should be `ATAN2(numeric, numeric)`


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199504092
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala
 ---
@@ -243,6 +243,16 @@ case class Atan(child: Expression) extends 
UnaryExpression {
   }
 }
 
+case class Atan2(left: Expression, right: Expression) extends 
BinaryExpression {
--- End diff --

No input validation? override `expectedTypes` or `validateInput()`


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199501131
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -3748,6 +3759,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight scala %}
+NUMERIC.atan2()
+{% endhighlight %}
+  
+  
+Calculates the tangent of a given coordinates.
--- End diff --

see above


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199500973
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -2184,6 +2184,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.atan2()
--- End diff --

should be `NUMERIC.atan2(NUMERIC)`


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199501083
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -3748,6 +3759,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight scala %}
+NUMERIC.atan2()
--- End diff --

should be `NUMERIC.atan2(NUMERIC)`.


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199500813
  
--- Diff: docs/dev/table/sql.md ---
@@ -1510,6 +1510,17 @@ ATAN(numeric)
   
 
 
+
+  
+{% highlight text %}
+ATAN2(numeric)
+{% endhighlight %}
+  
+  
+Calculates the arc tangent of a given coordinates.
--- End diff --

should be `of a given coordinate.` (-s)?


---


[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6223#discussion_r199501001
  
--- Diff: docs/dev/table/tableApi.md ---
@@ -2184,6 +2184,17 @@ NUMERIC.atan()
   
 
 
+
+  
+{% highlight java %}
+NUMERIC.atan2()
+{% endhighlight %}
+  
+  
+Calculates the arc tangent of a given coordinates.
--- End diff --

see above


---


[GitHub] flink issue #6226: [FLINK-8650] Tests for WINDOW clause and documentation up...

2018-07-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6226
  
Thanks for the update @snuyanzin.

Looks good. Will merge it.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-07-02 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199485393
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

I agree with @walterddr but let's wait for @twalthr who added the `quarter` 
function


---


[GitHub] flink issue #6123: [FLINK-9521] Add shade plugin executions to package table...

2018-07-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6123
  
I'm with Till here. The example JARs add very little value. I would be OK 
with adding them if this was lightweight, but each example will include the 
Table API and its dependencies (incl. Calcite and Janino) and hence be 16MB in 
size. 

So adding these examples to the distribution adds 80MB in total for very 
little value. 
Hence, I would not add them to the distribution.


---


[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r199204037
  
--- Diff: docs/dev/table/sql.md ---
@@ -176,9 +181,20 @@ groupItem:
   | ROLLUP '(' expression [, expression ]* ')'
   | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
-insert:
-  INSERT INTO tableReference
-  query
+windowRef:
+  windowName
+  |   windowSpec
+
+windowSpec:
+  [ windowName ]
+  '('
+  [ ORDER BY orderItem [, orderItem ]* ]
+  [ PARTITION BY expression [, expression ]* ]
+  [
+  RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING }
--- End diff --

Flink does not support `FOLLOWING` yet


---


[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r199204120
  
--- Diff: docs/dev/table/sql.md ---
@@ -176,9 +181,20 @@ groupItem:
   | ROLLUP '(' expression [, expression ]* ')'
   | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
-insert:
-  INSERT INTO tableReference
-  query
+windowRef:
+  windowName
+  |   windowSpec
+
+windowSpec:
+  [ windowName ]
+  '('
+  [ ORDER BY orderItem [, orderItem ]* ]
+  [ PARTITION BY expression [, expression ]* ]
+  [
+  RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING }
+  |   ROWS numericExpression { PRECEDING | FOLLOWING }
--- End diff --

Flink does not support `FOLLOWING` yet


---


[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r199205419
  
--- Diff: docs/dev/table/sql.md ---
@@ -176,9 +181,20 @@ groupItem:
   | ROLLUP '(' expression [, expression ]* ')'
   | GROUPING SETS '(' groupItem [, groupItem ]* ')'
 
-insert:
-  INSERT INTO tableReference
-  query
+windowRef:
+  windowName
+  |   windowSpec
+
+windowSpec:
--- End diff --

Can you also add a `WINDOW` example to the **Over Window aggregation** 
section in 
https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html#aggregations
 ?


---


[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...

2018-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6226#discussion_r199209179
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala
 ---
@@ -44,7 +44,26 @@ class OverWindowTest extends TableTestBase {
   "sum(DISTINCT c) OVER (PARTITION BY b ORDER BY proctime ROWS BETWEEN 
2 preceding AND " +
   "CURRENT ROW) as sum2 " +
   "from MyTable"
-
+val sql2 = "SELECT " +
--- End diff --

I think we can reduce the number of tests is a bit. 
We are basically testing Calcite's parser / validator multiple times with 
very similar queries. For example, Calcite does not distinguish between 
proctime and rowtime. One query with a `WINDOW` clause per test case should be 
sufficient.


---


[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi

2018-06-29 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6188#discussion_r199190279
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala
 ---
@@ -1029,6 +1029,29 @@ object temporalOverlaps {
 TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, 
rightTemporal)
   }
 }
+/**
+  * Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+  * by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+  * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+  *
+  * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+object timestampAdd {
+
+  /**
+* Adds a (signed) integer interval to a timestamp. The unit for the 
interval is given
+* by the unit argument, which should be one of the following values: 
"SECOND", "MINUTE",
+* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR".
+*
+* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to 
"2003-01-09".
+  */
+  def apply(
+  unit: Expression,
--- End diff --

I think we can and should add `week`. 

`quarter` is a bit more tricky... IMO, the current method is not well 
aligned with the other methods (`hour`, `day`, etc.) and it would make sense to 
change it for better consistency. However, such as change would break the API. 
What do you think @twalthr?


---


[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...

2018-06-29 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6201
  
Hi, I think timestamp fields of source-sink tables should be handled as 
follows when emitting the table:
- `proc-time`: ignore
- `from-field`: simply write out the timestamp as part of the row.
- `from-source`: write the timestamp separately to the system and remove it 
from the row. This only works if we can set the timestamp to the sink system. 
If the system sets the ingestion timestamp by it own, i.e., not the actual 
value, rows would contain different timestamps when they are ingested. If the 
sink system does not support to set a timestamp, we cannot allow such a table 
definition.


---


[GitHub] flink issue #6180: [FLINK-9524][Table API & SQL] check whether a clean-up ti...

2018-06-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6180
  
Thanks for the update @yzandrew.
Merging


---


[GitHub] flink issue #6153: [FLINK-9557] [formats] Parse 'integer' type as BigDecimal

2018-06-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6153
  
Looks good.
+1 to merge


---


[GitHub] flink issue #6163: [hotfix][table] Fix a incorrect exception message.

2018-06-22 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6163
  
Thanks for the PR @kisimple 
+1 to merge.

Best, Fabian


---


[GitHub] flink pull request #6180: [FLINK-9524][Table API & SQL] check whether a clea...

2018-06-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6180#discussion_r197453235
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -132,74 +132,80 @@ class ProcTimeBoundedRangeOver(
 
 val currentTime = timestamp - 1
 var i = 0
+// get the list of elements of current proctime
+val currentElements = rowMapState.get(currentTime)
 
-// initialize the accumulators
-var accumulators = accumulatorState.value()
+// clean-up timers might expire, which pass the needToCleanupState 
check above.
+// null-check is necessary here, otherwise NPE might be thrown.
+if(null != currentElements) {
--- End diff --

How about changing this to 

```
if (null == currentElements) {
  return
}
```

It would be touch much fewer lines of code and IMO easier to read (less 
nesting).
Please note the space between `if` and the condition.


---


[GitHub] flink pull request #6180: [FLINK-9524][Table API & SQL] check whether a clea...

2018-06-22 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6180#discussion_r197453796
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala
 ---
@@ -132,74 +132,80 @@ class ProcTimeBoundedRangeOver(
 
 val currentTime = timestamp - 1
 var i = 0
+// get the list of elements of current proctime
+val currentElements = rowMapState.get(currentTime)
 
-// initialize the accumulators
-var accumulators = accumulatorState.value()
+// clean-up timers might expire, which pass the needToCleanupState 
check above.
--- End diff --

Please rephrase comment to 
```
// Expired clean-up timers pass the needToCleanupState() check. 
// Perform a null check to verify that we have data to process.
```


---


[GitHub] flink issue #6131: [hotfix][docs] Fix Table API scala example code

2018-06-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6131
  
Hi @zjffdu, thanks for the fix!
+1 to merge


---


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6082
  
I think we have to return an typed array here. A `List` won't be supported 
by the built-in SQL functions. 

There are a few tricks on can play to create typed arrays, even in static 
code like

```
Object[] array = (Object[]) Array.newInstance(clazz, length);
```

Have a look at the code of the ORC InputFormat that had to solve a similar 
challenge: 
[OrcBatchReader.java](https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java).


---


[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...

2018-06-06 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6082
  
We treat sequences of values as arrays in SQL and the Table API. There are 
no built-in functions to handle lists. So we should return the values as an 
array, and hence don't need a List type.


---


[GitHub] flink issue #6106: [hotfix][table] Remove a println statement

2018-05-31 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6106
  
Oh... Thanks for fixing that :-)

+1 to merge


---


[GitHub] flink issue #6099: [FLINK-9473][Table API & SQL] Added new methods into Exte...

2018-05-30 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6099
  
Thanks for the PR @snuyanzin!
We cannot update the Calcite version to a `SNAPSHOT` version and have to 
wait until Calcite 1.17 is released before continuing with this PR.

Best, Fabian


---


[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...

2018-05-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5688#discussion_r190176842
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala
 ---
@@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, 
I, O](queryConfig: Stream
   protected def registerProcessingCleanupTimer(
 ctx: KeyedProcessFunction[K, I, O]#Context,
 currentTime: Long): Unit = {
-if (stateCleaningEnabled) {
+registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME)
+  }
 
+  protected def registerEventCleanupTimer(
--- End diff --

We implemented state cleanup as processing time because it is easier to 
reason about for users and doesn't interfere that much with event-time 
processing (it is not possible to distinguish timers yet). However, it also has 
a few short comings such as cleared state when recovering a query from a 
savepoint (which we don't really encourage at the moment). 

Anyway, introducing event-time state cleanup should definitely go into a 
separate issue and PR.


---


[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...

2018-05-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5998#discussion_r190171657
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala
 ---
@@ -0,0 +1,236 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.table.runtime.setop
+
+import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
+import org.apache.flink.api.java.typeutils.TupleTypeInfo
+import org.apache.flink.configuration.Configuration
+import org.apache.flink.streaming.api.functions.co.CoProcessFunction
+import org.apache.flink.table.api.{StreamQueryConfig, Types}
+import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector
+import org.apache.flink.table.runtime.types.CRow
+import 
org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode
+import org.apache.flink.table.util.Logging
+import org.apache.flink.types.Row
+import org.apache.flink.util.Collector
+
+class StreamIntersectCoProcessFunction(
+  resultType: TypeInformation[Row],
+  queryConfig: StreamQueryConfig,
+  all: Boolean)
+  extends CoProcessFunction[CRow, CRow, CRow]
--- End diff --

I think it makes sense to have two implementations of this operator.
1. For tables with a time attribute. This implementation works without 
retraction and can automatically cleanup the state. 
2. For tables without time attributes. This implementation needs to cleanup 
state based on retention time and produces retractions.

This PR seems to address both cases, which is fine for now. We can improve 
for 1. later on. Both cases should be implemented as `CoProcessFunction`. We 
should try to be independent of the DataStream window operators, IMO.


---


[GitHub] flink issue #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.

2018-05-18 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6039
  
Thanks, I'll also commit an empty release notes page to the 1.6 / master 
docs.


---


[GitHub] flink pull request #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.

2018-05-17 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/6039

[hotfix] [docs] Add Release Notes for Flink 1.5.

## What is the purpose of the change

* Add release notes for Flink 1.5 to the documentation

## Brief change log

* Add page with release notes for Flink 1.5
* Add "Release Notes" section to index page and link to Flink 1.5 release 
notes
* Remove link to out-dated migration guide from index page

## Verifying this change

* Docs change only.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink 1.5-releaseNotes

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6039.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6039


commit 37d9c155659cfb195fb3a1a85cf491bd0605ab8e
Author: Fabian Hueske <fhueske@...>
Date:   2018-05-17T16:38:22Z

[hotfix] [docs] Add Release Notes for Flink 1.5.




---


[GitHub] flink issue #5961: [FLINK-8255][DataSet API, DataStream API] key expressions...

2018-05-16 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5961
  
Thanks for the update @snuyanzin. I'll try to have a look at the changes in 
the next days.
Best, Fabian


---


[GitHub] flink issue #6012: [FLINK-9361] [sql-client] Fix refresh interval in changel...

2018-05-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6012
  
Fix looks good.
+1


---


[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...

2018-05-14 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/6001
  
Thanks for the fix @yanghua. I left a minor comment. 
Otherwise +1 to merge.


---


[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...

2018-05-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/6001#discussion_r187871263
  
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -883,7 +883,7 @@ private static class AverageAggregate
 
   @Override
   public Double getResult(Tuple2<Long, Long> accumulator) {
-return accumulator.f0 / accumulator.f1;
+return (double) accumulator.f0 / accumulator.f1;
--- End diff --

Should probably be `((double) accumulator.f0) / accumulator.f1;`


---


[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...

2018-05-13 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5897
  
Thanks, @StephanEwen. 

I will later merge this for `release-1.4` and `release-1.5`. Should we 
merge it for `master` as well and create a JIRA to drop the deprecated code? 
That would ensure we have the fix in 1.6 as well in case we don't drop the code 
for whatever reason.


---


[GitHub] flink issue #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of CallGen...

2018-05-11 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5988
  
Merging


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-11 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5860
  
Merging


---


[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...

2018-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5988#discussion_r187710610
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
 ---
@@ -64,17 +65,28 @@ object CallGenerator {
 
 val (auxiliaryStmt, result) = call(operands.map(_.resultTerm))
 
+val nullTermCode = if (
+  nullCheck &&
+  isReference(returnType) &&
+  !TypeCheckUtils.isTemporal(returnType)) {
+  s"""
+ |if ($resultTerm == null) {
+ |  $nullTerm = true;
+ |}
+   """.stripMargin
+} else {
+  ""
+}
+
 val resultCode = if (nullCheck && operands.nonEmpty) {
   s"""
 |${operands.map(_.code).mkString("\n")}
 |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
-|$resultTypeTerm $resultTerm;
-|if ($nullTerm) {
-|  $resultTerm = $defaultValue;
--- End diff --

Oh sorry. Overlooked that one. Thanks!


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-11 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5860
  
Hi @glaksh100, thanks for the update!


---


[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...

2018-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5988#discussion_r187577471
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -686,6 +686,28 @@ class SqlITCase extends StreamingWithStateTestBase {
 
 assertEquals(List(expected.toString()), 
StreamITCase.testResults.sorted)
   }
+
+  @Test
+  def testNullableFunctionCall(): Unit = {
--- End diff --

We should test this more lightweight with a unit test instead of running a 
complete query.
We can extend `ScalarFunctionsTest.testLPad()` with 

```
testSqlApi(
   "LPAD('hello', -1, 'x') IS NULL", "true"
)
```

to cover this case 


---


[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...

2018-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5988#discussion_r187575678
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
 ---
@@ -64,17 +65,28 @@ object CallGenerator {
 
 val (auxiliaryStmt, result) = call(operands.map(_.resultTerm))
 
+val nullTermCode = if (
+  nullCheck &&
+  isReference(returnType) &&
+  !TypeCheckUtils.isTemporal(returnType)) {
+  s"""
+ |if ($resultTerm == null) {
+ |  $nullTerm = true;
+ |}
+   """.stripMargin
+} else {
+  ""
+}
+
 val resultCode = if (nullCheck && operands.nonEmpty) {
   s"""
 |${operands.map(_.code).mkString("\n")}
 |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")};
-|$resultTypeTerm $resultTerm;
-|if ($nullTerm) {
-|  $resultTerm = $defaultValue;
--- End diff --

don't we need this case if the result value is a primitive?


---


[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...

2018-05-11 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5988#discussion_r187568481
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala
 ---
@@ -64,17 +65,28 @@ object CallGenerator {
 
 val (auxiliaryStmt, result) = call(operands.map(_.resultTerm))
 
+val nullTermCode = if (
+  nullCheck &&
+  isReference(returnType) &&
+  !TypeCheckUtils.isTemporal(returnType)) {
--- End diff --

Why do you exclude temporal types here?


---


[GitHub] flink issue #5969: [FLINK-9074] [e2e] Add e2e for resuming from externalized...

2018-05-11 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5969
  
thanks for the PR @tzulitai.
The new e2e tests passed on my machine. The changes look good as well.
+1 to merge


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186528901
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java
 ---
@@ -368,4 +369,23 @@ public void testIllegalBasicType2() {
 
FieldAccessor<Long, Long> f = 
FieldAccessorFactory.getAccessor(tpeInfo, "foo", null);
}
+
+   /**
+* Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testRowTypeInfo() {
--- End diff --

This test just validates that a `FieldAccessor` is created. At runtime it 
would fail with a `ClassCastException`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186484649
  
--- Diff: 
flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java
 ---
@@ -230,4 +235,43 @@ public String toString() {
}
}
 
+   /**
+* Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testMaxMinByRowTypeInfoKeyFieldsDataset() {
+
+   final ExecutionEnvironment env = ExecutionEnvironment
+   .getExecutionEnvironment();
+   TypeInformation[] types = new TypeInformation[] {Types.INT, 
Types.INT};
+
+   String[] fieldNames = new String[]{"id", "value"};
+   RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+   DataSet tupleDs = env
+   .fromCollection(Collections.singleton(new Row(2)), 
rowTypeInfo);
+
+   tupleDs.maxBy(0);
+   tupleDs.minBy(0);
+   }
+
+/**
+ * Validates that no ClassCastException happens
+* should not fail e.g. like in FLINK-8255.
+*/
+   @Test
+   public void testMaxMinByRowTypeInfoKeyFieldsForUnsortedGrouping() {
+   final ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
+
+   TypeInformation[] types = new TypeInformation[]{Types.INT, 
Types.INT};
+
+   String[] fieldNames = new String[]{"id", "value"};
+   RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames);
+
+   UnsortedGrouping groupDs = 
env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0);
+
+   groupDs.maxBy(1);
+   groupDs.minBy(1);
--- End diff --

The tests pass because the program is not executed. 
You would have to call `env.collect()` to run the program and compare the 
returned result against the expected result. As I pointed out before, this will 
fail, because the operator will cast the `Row` objects to `Tuple`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186477977
  
--- Diff: 
flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java
 ---
@@ -41,7 +41,7 @@
 * is regarded in the reduce function. First index has highest priority 
and last index has
 * least priority.
 */
-   public SelectByMinFunction(TupleTypeInfo type, int... fields) {
+   public SelectByMinFunction(TupleTypeInfoBase type, int... fields) {
--- End diff --

The `ReduceFunction` is still typed to `T extends Tuple` such that this 
will still fail at runtime. The same is true for all other built-in aggregation 
method like `sum()` and `min()` on `DataSet` and `UnsortedGrouping`. 

This cannot be resolved without major changes. I don't think we should add 
these features, but rather throw meaningful error messages instead of 
`ClassCastException`. 

Can you try to override the the `isTupleType()` method in `RowTypeInfo` and 
return `false`? 
This would prevent `Row` from being used in contexts that are only 
supported for `Tuple`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186527205
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -157,15 +156,15 @@ public T set(T record, F fieldValue) {
 
SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) {
--- End diff --

accessing fields in a `Row` will fail because `Row` does not extend 
`Tuple`. For a proper fix, we would need a `RowFieldAccessor` and use that one 
when we deal with a `DataStream`. We would then need to add the 
`RowFieldAccessor` to the `FieldAccessorFactory`.


---


[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...

2018-05-07 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5961#discussion_r186527277
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java
 ---
@@ -197,7 +196,7 @@ public T set(T record, F fieldValue) {
checkNotNull(typeInfo, "typeInfo must not be null.");
checkNotNull(innerAccessor, "innerAccessor must not be 
null.");
 
-   int arity = ((TupleTypeInfo) typeInfo).getArity();
+   int arity = typeInfo.getArity();
--- End diff --

Same as for `SimpleTupleFieldAccessor`.


---


[GitHub] flink issue #3765: [FLINK-6373] Add runtime support for distinct aggregation...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3765
  
The features that this PR was going to implement has been resolved by PR 
#. 
I will close it.


---


[GitHub] flink issue #3764: [FLINK-6335] Parse DISTINCT over grouped windows in strea...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3764
  
This PR has been integrated into #5940. 
I'll close it.


---


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
merging


---


[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
merging


---


[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
Thanks for the update @pavel-shvetsov-git.
+1 to merge


---


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
Thanks for the update @walterddr.
The PR is good to merge.


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-07 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5860
  
Hi @glaksh100, 

I just noticed that the bucket closing check is only done when a record is 
written. Hence, inactive buckets might not get closed in time if a larger 
inactive bucket interval is configured. In some sense, the new feature is an 
extended version of the inactive bucket closing feature.

How should we handle that case?

1. throw an exception during configuration, i.e., when 
`setInactiveBucketThreshold()` and `setBatchRolloverInterval()` are called.
2. configure the inactive bucket interval to be at least the rollover 
interval in case it is configured larger and continue. We should also make sure 
that the check interval is configured appropriately.

I'm leaning towards the first approach. It would make the misconfiguration 
obvious to the user and fail the program before it is submitted.

What do you think?

Best, Fabian


---


[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...

2018-05-04 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5940
  
Hmmm, good point. The discussion would be lost.
How about I put your changes on top of Haohui's changes before merging?


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186033706
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
+  (2L, 2, "Hello"),
+  (8L, 8, "Hello"),
+  (9L, 9, "Hello World"),
+  (4L, 4, "Hello"),
+  (16L, 16, "Hello"))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setParallelism(1)
+StreamITCase.clear
+val stream = env
+  .fromCollection(sessionWindowTestdata)
+  .assignTimestampsAndWatermarks(new 
TimestampAndWatermarkWithOffset[(Long, Int, String)](10L))
+
+val tEnv = TableEnvironment.getTableEnvironment(env)
+val table = stream.toTable(tEnv, 'long, 'int, 'string, 
'rowtime.rowtime)
+tEnv.registerTable("MyTable", table)
+
+val sqlQuery = "SELECT string, " +
+  "  COUNT(DISTINCT long) " +
--- End diff --

It would be good to add the end timestamp of the windows 
(`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball 
the expected test results.


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186022377
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -641,7 +641,8 @@ class AggregationCodeGenerator(
|  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
|  Object k = entry.getKey();
--- End diff --

Change this line to `${classOf[Row].getCanonicalName} k = 
(${classOf[Row].getCanonicalName}) entry.getKey();`


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186029985
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala
 ---
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.api.stream.sql
+
+import org.apache.flink.api.scala._
+import org.apache.flink.table.api.scala._
+import org.apache.flink.table.plan.logical.{SessionGroupWindow, 
SlidingGroupWindow, TumblingGroupWindow}
+import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase}
+import org.apache.flink.table.utils.TableTestUtil._
+import org.junit.{Ignore, Test}
+
+class DistinctAggregateTest extends TableTestBase {
+  private val streamUtil: StreamTableTestUtil = streamTestUtil()
+  streamUtil.addTable[(Int, String, Long)](
+"MyTable",
+'a, 'b, 'c,
+'proctime.proctime, 'rowtime.rowtime)
+
+  @Test
+  def testDistinct(): Unit = {
+val sql = "SELECT DISTINCT a, b, c FROM MyTable"
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select", "a, b, c")
+),
+term("groupBy", "a, b, c"),
+term("select", "a, b, c")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  // TODO: this query should be optimized to only have a single 
DataStreamGroupAggregate
+  // TODO: reopen this until FLINK-7144 fixed
+  @Ignore
+  @Test
+  def testDistinctAfterAggregate(): Unit = {
+val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c"
+
+val expected =
+  unaryNode(
+"DataStreamGroupAggregate",
+unaryNode(
+  "DataStreamCalc",
+  streamTableNode(0),
+  term("select", "a")
+),
+term("groupBy", "a"),
+term("select", "a")
+  )
+streamUtil.verifySql(sql, expected)
+  }
+
+  @Test
+  def testDistinctAggregateOnTumbleWindow(): Unit = {
+val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+  "  SUM(a) " +
+  "FROM MyTable " +
+  "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "rowtime", "a")
+  ),
+  term("window", TumblingGroupWindow('w$, 'rowtime, 90.millis)),
+  term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1")
+)
+
+streamUtil.verifySql(sqlQuery, expected)
+  }
+
+  @Test
+  def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = {
+val sqlQuery = "SELECT COUNT(DISTINCT a), " +
+  "  SUM(DISTINCT a), " +
+  "  MAX(DISTINCT a) " +
+  "FROM MyTable " +
+  "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) "
+
+val expected = unaryNode(
+  "DataStreamGroupWindowAggregate",
+  unaryNode(
+"DataStreamCalc",
+streamTableNode(0),
+term("select", "rowtime", "a")
+  ),
+  term("window", SlidingGroupWindow('w$, 'rowtime, 360.millis, 
90.millis)),
+  term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS 
EXPR$1",
+  

[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r185955967
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala
 ---
@@ -641,7 +641,8 @@ class AggregationCodeGenerator(
|  java.util.Map.Entry entry = (java.util.Map.Entry) 
mergeIt$i.next();
|  Object k = entry.getKey();
|  Long v = (Long) entry.getValue();
-   |  if (aDistinctAcc$i.add(k, v)) {
+   |  if (aDistinctAcc$i.add(
--- End diff --

The key in the entry is a `Row` already


---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186036107
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
--- End diff --

To check the correct merge behavior, we need two windows which aggregate 
the same value that is than deduplicated in merge.

Some data like:
```
  (1L, 2, "Hello"), // 1. Hello window
  (2L, 2, "Hello"), // 1. Hello window, deduped
  (8L, 2, "Hello"), // 2. Hello window, deduped during merge
  (10L, 3, "Hello"), // 2. Hello window, forwarded during merge
  (9L, 9, "Hello World"), // 1. Hello World window
  (4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. 
Hello windows
  (16L, 16, "Hello")) // 3. Hello window (not merged)
```



---


[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...

2018-05-04 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5940#discussion_r186033401
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala
 ---
@@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase {
 (8000L, "8", "Hello World"),
 (2L, "20", "Hello World"))
 
+  @Test
+  def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = {
+// create a watermark with 10ms offset to delay the window emission by 
10ms to verify merge
+val sessionWindowTestdata = List(
+  (1L, 1, "Hello"),
--- End diff --

The test is not checking for DISTINCT semantics since all aggregated values 
are distinct. We could do `COUNT(DISTINCT num)` (`int` has to be renamed to 
`num` because its a SQL keyword).


---


[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...

2018-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5836#discussion_r185942219
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput 
operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+   private static final OutputTag outputTag = new 
OutputTag("outputTag") {};
+
+   @Test
+   public void testSideOutputAfterSelectIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.split(Collections::singleton);
+
+   try {
+   processInput.getSideOutput(outputTag);
+   } catch (UnsupportedOperationException expected){
+   // expected
+   }
+   }
+
+   @Test
+   public void testSelectAfterSideOutputIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.getSideOutput(outputTag);
+
+   try {
+   processInput.split(Collections::singleton);
--- End diff --

same as above


---


[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...

2018-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5836#discussion_r185942147
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java
 ---
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.junit.Test;
+
+import java.util.Collections;
+
+/**
+ * Tests that verify correct behavior when applying split/getSideOutput 
operations on one {@link DataStream}.
+ */
+public class SplitSideOutputTest {
+
+   private static final OutputTag outputTag = new 
OutputTag("outputTag") {};
+
+   @Test
+   public void testSideOutputAfterSelectIsForbidden() {
+   StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   SingleOutputStreamOperator processInput = 
env.fromElements("foo")
+   .process(new DummyProcessFunction());
+
+   processInput.split(Collections::singleton);
+
+   try {
+   processInput.getSideOutput(outputTag);
--- End diff --

add `Assert.fail();` after `processInput.getSideOutput(outputTag);` to 
ensure that the test fails if no exception is thrown.


---


[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added

2018-05-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5927
  
Thanks for the PR @pavel-shvetsov-git!
I've left a suggestion to improve the error message.
Afterwards the PR should be good to merge.


---


[GitHub] flink pull request #5927: [FLINK-8237] [BucketingSink] Better error message ...

2018-05-03 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5927#discussion_r185932952
  
--- Diff: 
flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java
 ---
@@ -550,6 +550,9 @@ private void openNewPartFile(Path bucketPath, 
BucketState bucketState) throws
Path inProgressPath = getInProgressPathFor(partPath);
if (bucketState.writer == null) {
bucketState.writer = writerTemplate.duplicate();
+   if (bucketState.writer == null) {
+   throw new RuntimeException("Could not duplicate 
writer.");
--- End diff --

I would add the class name of the `writerTemplate` object and that the 
class needs to implement the `Writer.duplicate()` method.


---


[GitHub] flink issue #5909: [FLINK-8726][docs] Fix and normalize code-highlighting

2018-05-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5909
  
Thanks for cleaning up the syntax highlighting @zentol!
+1 to merge


---


[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...

2018-05-03 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5860
  
Thanks for the update @glaksh100! The changes look good to me. 
What do you think @aljoscha?


---


[GitHub] flink issue #5887: [FLINK-6719] [docs] Add details about fault-tolerance of ...

2018-05-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5887
  
Thanks for the update @bowenli86. 

I'll merge the PR later.


---


[GitHub] flink issue #5935: [FLINK-9119] example code error in Concepts & Common API

2018-05-02 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5935
  
Thanks for the fix @yanghua!

+1 to merge


---


[GitHub] flink issue #5899: Klink

2018-04-27 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5899
  
Hi @bitsilly, as @sihuazhou please describe the purpose of this PR. 
Also please translate the comments to English.

Thanks, Fabian



---


[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...

2018-04-26 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/3783
  
Hi @huawei-flink ,

I'll close this PR later today when merging #.
# adds runtime support for distinct aggregation and follows a similar 
approach as this PR. However, it leverages the `MapView` feature and is 
therefore a bit more generic. 
Thanks for working on this. This PR led us in the right direction.

Best, Fabian


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184442107
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

By default the LocalExecEnv chooses the parallelism is picked based on the 
number CPU cores. Are you running the tests in some kind of container?


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184419816
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala
 ---
@@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var 
mapView: MapView[E, JLon
   override def equals(that: Any): Boolean =
 that match {
   case that: DistinctAccumulator[E, ACC] => that.canEqual(this) &&
-this.mapView == that.mapView
+this.distinctValueMap == that.distinctValueMap
   case _ => false
 }
 
   def add(element: E): Boolean = {
-if (element != null) {
-  val currentVal = mapView.get(element)
-  if (currentVal != null) {
-mapView.put(element, currentVal + 1L)
-false
-  } else {
-mapView.put(element, 1L)
-true
-  }
-} else {
+val wrappedElement = Row.of(element)
--- End diff --

I think we should remove the `E` type parameter and directly pass the `Row` 
as an argument. That will also make the extension to multiple arguments very 
easy. 

Actually, I think I'll do that before merging


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184405910
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

Also, the event-time test is not required. We test the retract case also 
with BOUNDED OVER windows (rows the fall out of the window are retracted).


---


[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...

2018-04-26 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/#discussion_r184405329
  
--- Diff: 
flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala
 ---
@@ -50,6 +50,155 @@ class OverWindowITCase extends 
StreamingWithStateTestBase {
 (8L, 8, "Hello World"),
 (20L, 20, "Hello World"))
 
+  @Test
+  def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW), " +
+  "  COLLECT(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND 
CURRENT ROW) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,{1=1}",
+  "2,2,2,{2=1}",
+  "2,3,1,{1=1, 2=1}",
+  "3,2,2,{2=1}",
+  "3,2,2,{2=1}",
+  "3,5,2,{2=1, 3=1}",
+  "4,2,2,{2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "4,3,1,{1=1, 2=1}",
+  "5,1,1,{1=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,4,1,{1=1, 3=1}",
+  "5,6,1,{1=1, 2=1, 3=1}",
+  "5,5,2,{2=1, 3=1}")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = {
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStateBackend(getStateBackend)
+val tEnv = TableEnvironment.getTableEnvironment(env)
+env.setParallelism(1)
+StreamITCase.clear
+
+val t = StreamTestData.get5TupleDataStream(env)
+  .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime)
+tEnv.registerTable("MyTable", t)
+
+val sqlQuery = "SELECT a, " +
+  "  COUNT(e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  SUM(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " +
+  "  MIN(DISTINCT e) OVER (" +
+  "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " +
+  "FROM MyTable"
+
+val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row]
+result.addSink(new StreamITCase.StringSink[Row])
+env.execute()
+
+val expected = List(
+  "1,1,1,1",
+  "2,1,2,2",
+  "2,2,3,1",
+  "3,1,2,2",
+  "3,2,2,2",
+  "3,3,5,2",
+  "4,1,2,2",
+  "4,2,3,1",
+  "4,3,3,1",
+  "4,4,3,1",
+  "5,1,1,1",
+  "5,2,4,1",
+  "5,3,4,1",
+  "5,4,6,1",
+  "5,5,6,1")
+assertEquals(expected, StreamITCase.testResults)
+  }
+
+  @Test
+  def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = {
+// use out-of-order data to test distinct accumulator remove
+val data = Seq(
+  Left((2L, (2L, 2, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((2L, (2L, 2, "Hello"))),
+  Left((1L, (1L, 1, "Hello"))),
+  Left((20L, (20L, 20, "Hello World"))), // early row
+  Right(3L),
+  Left((2L, (2L, 2, "Hello"))), // late row
+  Left((3L, (3L, 3, "Hello"))),
+  Left((4L, (4L, 4, "Hello"))),
+  Left((5L, (5L, 5, "Hello"))),
+  Left((6L, (6L, 6, "Hello"))),
+  Left((7L, (7L, 7, "Hello World"))),
+  Right(7L),
+  Left((9L, (9L, 9, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Left((8L, (8L, 8, "Hello World"))),
+  Right(20L))
+
+val env = StreamExecutionEnvironment.getExecutionEnvironment
+env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
+env.setStateBackend(getStateBackend)
+env.setParallelism(1)
--- End diff --

Actually, I was wrong on this one. Late elements are not deterministic 
handled if p > 1.
Will change it back.


---


[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...

2018-04-25 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5867#discussion_r184099775
  
--- Diff: flink-dist/src/main/resources/flink-conf.yaml ---
@@ -229,17 +229,26 @@ web.port: 8081
 
 # Directory to upload completed jobs to. Add this directory to the list of
 # monitored directories of the HistoryServer as well (see below).
-#jobmanager.archive.fs.dir: hdfs:///completed-jobs/
+# jobmanager.archive.fs.dir: hdfs:///completed-jobs/
 
 # The address under which the web-based HistoryServer listens.
-#historyserver.web.address: 0.0.0.0
+# historyserver.web.address: 0.0.0.0
 
 # The port under which the web-based HistoryServer listens.
-#historyserver.web.port: 8082
+# historyserver.web.port: 8082
 
 # Comma separated list of directories to monitor for completed jobs.
-#historyserver.archive.fs.dir: hdfs:///completed-jobs/
+# historyserver.archive.fs.dir: hdfs:///completed-jobs/
 
 # Interval in milliseconds for refreshing the monitored directories.
-#historyserver.archive.fs.refresh-interval: 1
+# historyserver.archive.fs.refresh-interval: 1
+

+#==
+# SQL Client

+#==
+
+# The SQL Client CLI can be started via bin/sql-client.sh embedded
+
+# The heap size for the SQL Client CLI JVM
+# sqlclient.cli.heap.mb: 1024
--- End diff --

OK. But there's another discussion thread whether it is useful to share 
properties across JM/TM JVMs and client processes.


---


  1   2   3   4   5   6   7   8   9   10   >