[GitHub] flink issue #3124: [FLINK-5281] Extend KafkaJsonTableSources to support nest...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3124 This PR can be closed. PR #5491 added a TableSource that supports nested JSON data. Thanks, Fabian ---
[GitHub] flink issue #3609: [FLINK-6073] - Support for SQL inner queries for proctime
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3609 This PR can be closed as it will be addressed by FLINK-9714. Thanks, Fabian ---
[GitHub] flink issue #6341: [FLINK-5750] Incorrect translation of n-ary Union
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6341 Thanks for the fix @AlexanderKoltsov! I'll merge this. ---
[GitHub] flink pull request #6393: [FLINK-9296] [table] Add support for non-windowed ...
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/6393 [FLINK-9296] [table] Add support for non-windowed DISTINCT aggregates. ## What is the purpose of the change - Fix a regression from 1.5 that was caused by rearranging optimization rules. - Add tests to ensure the feature won't be broken again. ## Brief change log - Remove DISTINCT limitation on `DataStreamAggregateRule`. - Add plan test to ensure queries are correctly translated - Add ITCase to ensure queries are correctly executed - Remove outdated limitation from documentation ## Verifying this change - Run the added tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **n/a** You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink table-distinctGroupBy Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6393.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6393 ---
[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6267#discussion_r200478425 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala --- @@ -36,22 +39,21 @@ import scala.collection.JavaConverters._ class DataSetUnion( --- End diff -- We need the same fix for `DataStreamUnion` ---
[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6267#discussion_r200478336 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala --- @@ -36,22 +39,21 @@ import scala.collection.JavaConverters._ class DataSetUnion( cluster: RelOptCluster, traitSet: RelTraitSet, -leftNode: RelNode, -rightNode: RelNode, -rowRelDataType: RelDataType) - extends BiRel(cluster, traitSet, leftNode, rightNode) +inputs: JList[RelNode], +rowRelDataType: RelDataType, +all: Boolean) + extends Union(cluster, traitSet, inputs, all) --- End diff -- Change to `Union(cluster, traitSet, inputs, true)` ---
[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6267#discussion_r200480534 --- Diff: flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/batch/sql/JavaSqlITCase.java --- @@ -73,6 +73,30 @@ public void testValues() throws Exception { compareResultAsText(results, expected); } + @Test + public void testValuesWithCast() throws Exception { --- End diff -- Can you move this test to `org.apache.flink.table.runtime.batch.sql.SetOperatorsITCase` and also add one to `org.apache.flink.table.runtime.stream.sql.SetOperatorsITCase`? In addition it would be good to have to plan tests for this query in `org.apache.flink.table.api.batch.sql.SetOperatorsTest` and `org.apache.flink.table.api.stream.sql.SetOperatorsTest`. ---
[GitHub] flink pull request #6267: [FLINK-5750] Incorrect parse of brackets inside VA...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6267#discussion_r200478263 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/dataset/DataSetUnion.scala --- @@ -36,22 +39,21 @@ import scala.collection.JavaConverters._ class DataSetUnion( cluster: RelOptCluster, traitSet: RelTraitSet, -leftNode: RelNode, -rightNode: RelNode, -rowRelDataType: RelDataType) - extends BiRel(cluster, traitSet, leftNode, rightNode) +inputs: JList[RelNode], +rowRelDataType: RelDataType, +all: Boolean) --- End diff -- we don't need the `all` parameter because `DataStreamUnion` only supports `UNION ALL` semantics. ---
[GitHub] flink issue #6161: [hotfix] [docs][FLINK-9581] Typo: extra spaces removed to...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6161 Documentation fixes are usually not critical to include in a release because the docs are always built from the most recent release branch. So also documentation changes that are not included in a release will be published shortly after being committed. I'll merge this PR. Btw. it is OK to create a hotfix (i.e., a PR without creating a JIRA issue) for minor fixes like this. Thanks, Fabian ---
[GitHub] flink issue #6255: [FLINK-9681] [table] Make sure difference between minRete...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6255 Thanks for the update @hequn8128. I'll merge this ---
[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Expose Expression.resultTyp...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6252 Thanks for the update @HeartSaVioR. I'll merge this ---
[GitHub] flink issue #6253: [FLINK-8094][Table API & SQL] Support other types for Exi...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6253 Thanks for the update @HeartSaVioR. I'll merge this ---
[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Expose Expression.resultTyp...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6252 Looks good. Just one last comment. ---
[GitHub] flink pull request #6252: [FLINK-9742][Table API & SQL] Expose Expression.re...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6252#discussion_r200345063 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/ExpressionUtils.scala --- @@ -22,13 +22,16 @@ import java.lang.{Boolean => JBoolean, Byte => JByte, Double => JDouble, Float = import java.math.{BigDecimal => JBigDecimal} import java.sql.{Date, Time, Timestamp} -import org.apache.flink.api.common.typeinfo.BasicTypeInfo +import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.streaming.api.windowing.time.{Time => FlinkTime} import org.apache.flink.table.api.ValidationException import org.apache.flink.table.calcite.FlinkTypeFactory import org.apache.flink.table.typeutils.{RowIntervalTypeInfo, TimeIntervalTypeInfo} object ExpressionUtils { + def getReturnType(expr: Expression): TypeInformation[_] = { --- End diff -- Please add Scala docs since this is a public method. Rename to `getResultType` for consistency? ---
[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6255#discussion_r200333630 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/harness/HarnessTestBase.scala --- @@ -384,4 +386,12 @@ object HarnessTestBase { value.row.getField(selectorField).asInstanceOf[T] } } + + /** +* Test class used to test min and max retention time. +*/ + class StreamQueryConfigTest(min: Time, max: Time) extends StreamQueryConfig { --- End diff -- I would rename the class to `TestStreamQueryConfig` because the `Test` at the end suggests that this class is testing something instead of being a util for a test. ---
[GitHub] flink pull request #6255: [FLINK-9681] [table] Make sure difference between ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6255#discussion_r200330683 --- Diff: docs/dev/table/streaming.md --- @@ -591,16 +589,14 @@ qConfig.withIdleStateRetentionTime(Time.hours(12); val qConfig: StreamQueryConfig = ??? -// set idle state retention time: min = 12 hour, max = 16 hours -qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16)) -// set idle state retention time. min = max = 12 hours -qConfig.withIdleStateRetentionTime(Time.hours(12) +// set idle state retention time: min = 12 hour, max = 24 hours +qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24)) {% endhighlight %} -Configuring different minimum and maximum idle state retention times is more efficient because it reduces the internal book-keeping of a query for when to remove state. +Configuring different minimum and maximum idle state retention times is more efficient because it reduces the internal book-keeping of a query for when to remove state. Difference between minTime and maxTime shoud be at least 5 minutes. --- End diff -- The "... more efficient ..." does not apply anymore. Maybe rephrase to > Cleaning up state requires additional bookkeeping which becomes less expensive for larger differences of `minTime` and `maxTime`. The difference between `minTime` and `maxTime` must be at least 5 minutes. ---
[GitHub] flink issue #6253: [WIP][FLINK-8094][Table API & SQL] Support other types fo...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6253 It's a good question whether to add a new class or not. Right now, extending the implementation of `ExistingField` seems like a better approach to me. Once we add a `ParsingExistingField` extractor that can be configured with a timestamp format, the ISO date String support in `ExistingField` would be obsolete, but IMO that's not a big problem. +1 to change the implementation to extend `ExistingField`. Regarding the tests, I think `ExistingField` is used in a few ITCases, but there are no unit tests yet. Adding unit tests is a bit tricky, because we would need to integrate it with the code generator, etc. So, a big +1 if you would like to look into that, but I'd also be fine by adding another ITCase. For the documentation, we might want to extend the bullet point about `timestampExtractor` in the [Defining a Rowtime Attribute](https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sourceSinks.html#defining-a-rowtime-attribute) section. Thanks, Fabian ---
[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6252 I think it can also be a new public method in `org.apache.flink.table.expressions.ExpressionUtils`. Thanks, Fabian ---
[GitHub] flink issue #6252: [FLINK-9742][Table API & SQL] Widen scope of Expression.r...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6252 Thanks for opening this PR @HeartSaVioR! (cc @twalthr) I thought about this again and I don't think we should make the method public. The problem is that `Expression` is one of the core classes of the Table API. By making the method public it becomes visible to all users of the API. A better approach is to provide a publicly accessible util object (in org.apache.flink.table.api...) that provides access to the result type (and possibly other properties) of ab `Expression`. What do you think? Best, Fabian ---
[GitHub] flink issue #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6188 Hmm, I think there is still a case for `timestampAdd`. The problem are time intervals with variable length such as `MONTH`, `QUARTER`, and `YEAR`. All of these cannot be defined by milliseconds, because they depend on the context. So `timstampAdd(ts, 1.month)` returns a different result than `ts + 1.month`. ---
[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6201 Hi @suez1224, that sounds good overall. :-) A few comments: - I would not add a user-facing property `connector.support-timestamp` because a user chooses that by choosing the connector type. Whether the connector supports writing a system timestamp can be an internal field/annotation/interface of the `TableSink` that is generated from the properties. - Copying the timestamp to the StreamRecord timestamp field can be done with a process function. Actually, we do that already when converting a Table into a DataStream. Setting the flag in the Kafka TableSink should be easy. - Not sure if `from-source` needs to be supported by the initial version. We could just implement `from-field` for now, and handle `from-source` as a follow up issue. Since we are approaching feature freeze, I think this might be a good idea at this point. What do you think? Fabian ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199743600 --- Diff: docs/dev/table/tableApi.md --- @@ -2184,6 +2184,17 @@ NUMERIC.atan() + + +{% highlight java %} +NUMERIC.atan2(NUMERIC) --- End diff -- given that both parameters are equally important, we might want to change the syntax to `atan2(Numeric, Numeric)`. IMO, that would be more intuitive. What do you think? ---
[GitHub] flink issue #6161: [hotfix] [docs][FLINK-9581] Typo: extra spaces removed to...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6161 Thanks @snuyanzin! +1 to merge ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199500643 --- Diff: docs/dev/table/sql.md --- @@ -1510,6 +1510,17 @@ ATAN(numeric) + + +{% highlight text %} +ATAN2(numeric) --- End diff -- should be `ATAN2(numeric, numeric)` ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199504092 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/expressions/mathExpressions.scala --- @@ -243,6 +243,16 @@ case class Atan(child: Expression) extends UnaryExpression { } } +case class Atan2(left: Expression, right: Expression) extends BinaryExpression { --- End diff -- No input validation? override `expectedTypes` or `validateInput()` ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199501131 --- Diff: docs/dev/table/tableApi.md --- @@ -3748,6 +3759,17 @@ NUMERIC.atan() + + +{% highlight scala %} +NUMERIC.atan2() +{% endhighlight %} + + +Calculates the tangent of a given coordinates. --- End diff -- see above ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199500973 --- Diff: docs/dev/table/tableApi.md --- @@ -2184,6 +2184,17 @@ NUMERIC.atan() + + +{% highlight java %} +NUMERIC.atan2() --- End diff -- should be `NUMERIC.atan2(NUMERIC)` ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199501083 --- Diff: docs/dev/table/tableApi.md --- @@ -3748,6 +3759,17 @@ NUMERIC.atan() + + +{% highlight scala %} +NUMERIC.atan2() --- End diff -- should be `NUMERIC.atan2(NUMERIC)`. ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199500813 --- Diff: docs/dev/table/sql.md --- @@ -1510,6 +1510,17 @@ ATAN(numeric) + + +{% highlight text %} +ATAN2(numeric) +{% endhighlight %} + + +Calculates the arc tangent of a given coordinates. --- End diff -- should be `of a given coordinate.` (-s)? ---
[GitHub] flink pull request #6223: [FLINK-9688] ATAN2 sql function support
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6223#discussion_r199501001 --- Diff: docs/dev/table/tableApi.md --- @@ -2184,6 +2184,17 @@ NUMERIC.atan() + + +{% highlight java %} +NUMERIC.atan2() +{% endhighlight %} + + +Calculates the arc tangent of a given coordinates. --- End diff -- see above ---
[GitHub] flink issue #6226: [FLINK-8650] Tests for WINDOW clause and documentation up...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6226 Thanks for the update @snuyanzin. Looks good. Will merge it. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199485393 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- I agree with @walterddr but let's wait for @twalthr who added the `quarter` function ---
[GitHub] flink issue #6123: [FLINK-9521] Add shade plugin executions to package table...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6123 I'm with Till here. The example JARs add very little value. I would be OK with adding them if this was lightweight, but each example will include the Table API and its dependencies (incl. Calcite and Janino) and hence be 16MB in size. So adding these examples to the distribution adds 80MB in total for very little value. Hence, I would not add them to the distribution. ---
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r199204037 --- Diff: docs/dev/table/sql.md --- @@ -176,9 +181,20 @@ groupItem: | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' -insert: - INSERT INTO tableReference - query +windowRef: + windowName + | windowSpec + +windowSpec: + [ windowName ] + '(' + [ ORDER BY orderItem [, orderItem ]* ] + [ PARTITION BY expression [, expression ]* ] + [ + RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING } --- End diff -- Flink does not support `FOLLOWING` yet ---
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r199204120 --- Diff: docs/dev/table/sql.md --- @@ -176,9 +181,20 @@ groupItem: | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' -insert: - INSERT INTO tableReference - query +windowRef: + windowName + | windowSpec + +windowSpec: + [ windowName ] + '(' + [ ORDER BY orderItem [, orderItem ]* ] + [ PARTITION BY expression [, expression ]* ] + [ + RANGE numericOrIntervalExpression { PRECEDING | FOLLOWING } + | ROWS numericExpression { PRECEDING | FOLLOWING } --- End diff -- Flink does not support `FOLLOWING` yet ---
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r199205419 --- Diff: docs/dev/table/sql.md --- @@ -176,9 +181,20 @@ groupItem: | ROLLUP '(' expression [, expression ]* ')' | GROUPING SETS '(' groupItem [, groupItem ]* ')' -insert: - INSERT INTO tableReference - query +windowRef: + windowName + | windowSpec + +windowSpec: --- End diff -- Can you also add a `WINDOW` example to the **Over Window aggregation** section in https://ci.apache.org/projects/flink/flink-docs-release-1.5/dev/table/sql.html#aggregations ? ---
[GitHub] flink pull request #6226: [FLINK-8650] Tests for WINDOW clause and documenta...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6226#discussion_r199209179 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/OverWindowTest.scala --- @@ -44,7 +44,26 @@ class OverWindowTest extends TableTestBase { "sum(DISTINCT c) OVER (PARTITION BY b ORDER BY proctime ROWS BETWEEN 2 preceding AND " + "CURRENT ROW) as sum2 " + "from MyTable" - +val sql2 = "SELECT " + --- End diff -- I think we can reduce the number of tests is a bit. We are basically testing Calcite's parser / validator multiple times with very similar queries. For example, Calcite does not distinguish between proctime and rowtime. One query with a `WINDOW` clause per test case should be sufficient. ---
[GitHub] flink pull request #6188: [FLINK-6846][Table API] add timestampAdd tableApi
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6188#discussion_r199190279 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/scala/expressionDsl.scala --- @@ -1029,6 +1029,29 @@ object temporalOverlaps { TemporalOverlaps(leftTimePoint, leftTemporal, rightTimePoint, rightTemporal) } } +/** + * Adds a (signed) integer interval to a timestamp. The unit for the interval is given + * by the unit argument, which should be one of the following values: "SECOND", "MINUTE", + * "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". + * + * e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ +object timestampAdd { + + /** +* Adds a (signed) integer interval to a timestamp. The unit for the interval is given +* by the unit argument, which should be one of the following values: "SECOND", "MINUTE", +* "HOUR", "DAY", "WEEK", "MONTH", "QUARTER" or "YEAR". +* +* e.g. timestampAdd("WEEK", 1, '2003-01-02'.toDate) leads to "2003-01-09". + */ + def apply( + unit: Expression, --- End diff -- I think we can and should add `week`. `quarter` is a bit more tricky... IMO, the current method is not well aligned with the other methods (`hour`, `day`, etc.) and it would make sense to change it for better consistency. However, such as change would break the API. What do you think @twalthr? ---
[GitHub] flink issue #6201: [FLINK-8866][Table API & SQL] Add support for unified tab...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6201 Hi, I think timestamp fields of source-sink tables should be handled as follows when emitting the table: - `proc-time`: ignore - `from-field`: simply write out the timestamp as part of the row. - `from-source`: write the timestamp separately to the system and remove it from the row. This only works if we can set the timestamp to the sink system. If the system sets the ingestion timestamp by it own, i.e., not the actual value, rows would contain different timestamps when they are ingested. If the sink system does not support to set a timestamp, we cannot allow such a table definition. ---
[GitHub] flink issue #6180: [FLINK-9524][Table API & SQL] check whether a clean-up ti...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6180 Thanks for the update @yzandrew. Merging ---
[GitHub] flink issue #6153: [FLINK-9557] [formats] Parse 'integer' type as BigDecimal
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6153 Looks good. +1 to merge ---
[GitHub] flink issue #6163: [hotfix][table] Fix a incorrect exception message.
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6163 Thanks for the PR @kisimple +1 to merge. Best, Fabian ---
[GitHub] flink pull request #6180: [FLINK-9524][Table API & SQL] check whether a clea...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6180#discussion_r197453235 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -132,74 +132,80 @@ class ProcTimeBoundedRangeOver( val currentTime = timestamp - 1 var i = 0 +// get the list of elements of current proctime +val currentElements = rowMapState.get(currentTime) -// initialize the accumulators -var accumulators = accumulatorState.value() +// clean-up timers might expire, which pass the needToCleanupState check above. +// null-check is necessary here, otherwise NPE might be thrown. +if(null != currentElements) { --- End diff -- How about changing this to ``` if (null == currentElements) { return } ``` It would be touch much fewer lines of code and IMO easier to read (less nesting). Please note the space between `if` and the condition. ---
[GitHub] flink pull request #6180: [FLINK-9524][Table API & SQL] check whether a clea...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6180#discussion_r197453796 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/ProcTimeBoundedRangeOver.scala --- @@ -132,74 +132,80 @@ class ProcTimeBoundedRangeOver( val currentTime = timestamp - 1 var i = 0 +// get the list of elements of current proctime +val currentElements = rowMapState.get(currentTime) -// initialize the accumulators -var accumulators = accumulatorState.value() +// clean-up timers might expire, which pass the needToCleanupState check above. --- End diff -- Please rephrase comment to ``` // Expired clean-up timers pass the needToCleanupState() check. // Perform a null check to verify that we have data to process. ``` ---
[GitHub] flink issue #6131: [hotfix][docs] Fix Table API scala example code
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6131 Hi @zjffdu, thanks for the fix! +1 to merge ---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6082 I think we have to return an typed array here. A `List` won't be supported by the built-in SQL functions. There are a few tricks on can play to create typed arrays, even in static code like ``` Object[] array = (Object[]) Array.newInstance(clazz, length); ``` Have a look at the code of the ORC InputFormat that had to solve a similar challenge: [OrcBatchReader.java](https://github.com/apache/flink/blob/master/flink-connectors/flink-orc/src/main/java/org/apache/flink/orc/OrcBatchReader.java). ---
[GitHub] flink issue #6082: [FLINK-9444][table] KafkaAvroTableSource failed to work f...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6082 We treat sequences of values as arrays in SQL and the Table API. There are no built-in functions to handle lists. So we should return the values as an array, and hence don't need a List type. ---
[GitHub] flink issue #6106: [hotfix][table] Remove a println statement
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6106 Oh... Thanks for fixing that :-) +1 to merge ---
[GitHub] flink issue #6099: [FLINK-9473][Table API & SQL] Added new methods into Exte...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6099 Thanks for the PR @snuyanzin! We cannot update the Calcite version to a `SNAPSHOT` version and have to wait until Calcite 1.17 is released before continuing with this PR. Best, Fabian ---
[GitHub] flink pull request #5688: [FLINK-6968][Table API & SQL] Add Queryable table ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5688#discussion_r190176842 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/KeyedProcessFunctionWithCleanupState.scala --- @@ -44,8 +45,20 @@ abstract class KeyedProcessFunctionWithCleanupState[K, I, O](queryConfig: Stream protected def registerProcessingCleanupTimer( ctx: KeyedProcessFunction[K, I, O]#Context, currentTime: Long): Unit = { -if (stateCleaningEnabled) { +registerCleanupTimer(ctx, currentTime, TimeDomain.PROCESSING_TIME) + } + protected def registerEventCleanupTimer( --- End diff -- We implemented state cleanup as processing time because it is easier to reason about for users and doesn't interfere that much with event-time processing (it is not possible to distinguish timers yet). However, it also has a few short comings such as cleared state when recovering a query from a savepoint (which we don't really encourage at the moment). Anyway, introducing event-time state cleanup should definitely go into a separate issue and PR. ---
[GitHub] flink pull request #5998: [FLINK-9344] [TableAPI & SQL] Support INTERSECT an...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5998#discussion_r190171657 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/setop/StreamIntersectCoProcessFunction.scala --- @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.table.runtime.setop + +import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor} +import org.apache.flink.api.common.typeinfo.TypeInformation +import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.typeutils.TupleTypeInfo +import org.apache.flink.configuration.Configuration +import org.apache.flink.streaming.api.functions.co.CoProcessFunction +import org.apache.flink.table.api.{StreamQueryConfig, Types} +import org.apache.flink.table.runtime.join.CRowWrappingMultiOutputCollector +import org.apache.flink.table.runtime.types.CRow +import org.apache.flink.table.typeutils.TypeCheckUtils.validateEqualsHashCode +import org.apache.flink.table.util.Logging +import org.apache.flink.types.Row +import org.apache.flink.util.Collector + +class StreamIntersectCoProcessFunction( + resultType: TypeInformation[Row], + queryConfig: StreamQueryConfig, + all: Boolean) + extends CoProcessFunction[CRow, CRow, CRow] --- End diff -- I think it makes sense to have two implementations of this operator. 1. For tables with a time attribute. This implementation works without retraction and can automatically cleanup the state. 2. For tables without time attributes. This implementation needs to cleanup state based on retention time and produces retractions. This PR seems to address both cases, which is fine for now. We can improve for 1. later on. Both cases should be implemented as `CoProcessFunction`. We should try to be independent of the DataStream window operators, IMO. ---
[GitHub] flink issue #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6039 Thanks, I'll also commit an empty release notes page to the 1.6 / master docs. ---
[GitHub] flink pull request #6039: [hotfix] [docs] Add Release Notes for Flink 1.5.
GitHub user fhueske opened a pull request: https://github.com/apache/flink/pull/6039 [hotfix] [docs] Add Release Notes for Flink 1.5. ## What is the purpose of the change * Add release notes for Flink 1.5 to the documentation ## Brief change log * Add page with release notes for Flink 1.5 * Add "Release Notes" section to index page and link to Flink 1.5 release notes * Remove link to out-dated migration guide from index page ## Verifying this change * Docs change only. You can merge this pull request into a Git repository by running: $ git pull https://github.com/fhueske/flink 1.5-releaseNotes Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6039.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6039 commit 37d9c155659cfb195fb3a1a85cf491bd0605ab8e Author: Fabian Hueske <fhueske@...> Date: 2018-05-17T16:38:22Z [hotfix] [docs] Add Release Notes for Flink 1.5. ---
[GitHub] flink issue #5961: [FLINK-8255][DataSet API, DataStream API] key expressions...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5961 Thanks for the update @snuyanzin. I'll try to have a look at the changes in the next days. Best, Fabian ---
[GitHub] flink issue #6012: [FLINK-9361] [sql-client] Fix refresh interval in changel...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6012 Fix looks good. +1 ---
[GitHub] flink issue #6001: [FLINK-9299] ProcessWindowFunction documentation Java exa...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/6001 Thanks for the fix @yanghua. I left a minor comment. Otherwise +1 to merge. ---
[GitHub] flink pull request #6001: [FLINK-9299] ProcessWindowFunction documentation J...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/6001#discussion_r187871263 --- Diff: docs/dev/stream/operators/windows.md --- @@ -883,7 +883,7 @@ private static class AverageAggregate @Override public Double getResult(Tuple2<Long, Long> accumulator) { -return accumulator.f0 / accumulator.f1; +return (double) accumulator.f0 / accumulator.f1; --- End diff -- Should probably be `((double) accumulator.f0) / accumulator.f1;` ---
[GitHub] flink issue #5897: [FLINK-9234] [table] Fix missing dependencies for externa...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5897 Thanks, @StephanEwen. I will later merge this for `release-1.4` and `release-1.5`. Should we merge it for `master` as well and create a JIRA to drop the deprecated code? That would ensure we have the fix in 1.6 as well in case we don't drop the code for whatever reason. ---
[GitHub] flink issue #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of CallGen...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5988 Merging ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5860 Merging ---
[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5988#discussion_r187710610 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala --- @@ -64,17 +65,28 @@ object CallGenerator { val (auxiliaryStmt, result) = call(operands.map(_.resultTerm)) +val nullTermCode = if ( + nullCheck && + isReference(returnType) && + !TypeCheckUtils.isTemporal(returnType)) { + s""" + |if ($resultTerm == null) { + | $nullTerm = true; + |} + """.stripMargin +} else { + "" +} + val resultCode = if (nullCheck && operands.nonEmpty) { s""" |${operands.map(_.code).mkString("\n")} |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")}; -|$resultTypeTerm $resultTerm; -|if ($nullTerm) { -| $resultTerm = $defaultValue; --- End diff -- Oh sorry. Overlooked that one. Thanks! ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5860 Hi @glaksh100, thanks for the update! ---
[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5988#discussion_r187577471 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -686,6 +686,28 @@ class SqlITCase extends StreamingWithStateTestBase { assertEquals(List(expected.toString()), StreamITCase.testResults.sorted) } + + @Test + def testNullableFunctionCall(): Unit = { --- End diff -- We should test this more lightweight with a unit test instead of running a complete query. We can extend `ScalarFunctionsTest.testLPad()` with ``` testSqlApi( "LPAD('hello', -1, 'x') IS NULL", "true" ) ``` to cover this case ---
[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5988#discussion_r187575678 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala --- @@ -64,17 +65,28 @@ object CallGenerator { val (auxiliaryStmt, result) = call(operands.map(_.resultTerm)) +val nullTermCode = if ( + nullCheck && + isReference(returnType) && + !TypeCheckUtils.isTemporal(returnType)) { + s""" + |if ($resultTerm == null) { + | $nullTerm = true; + |} + """.stripMargin +} else { + "" +} + val resultCode = if (nullCheck && operands.nonEmpty) { s""" |${operands.map(_.code).mkString("\n")} |boolean $nullTerm = ${operands.map(_.nullTerm).mkString(" || ")}; -|$resultTypeTerm $resultTerm; -|if ($nullTerm) { -| $resultTerm = $defaultValue; --- End diff -- don't we need this case if the result value is a primitive? ---
[GitHub] flink pull request #5988: [FLINK-9332][TableAPI & SQL] Fix Codegen error of ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5988#discussion_r187568481 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/CallGenerator.scala --- @@ -64,17 +65,28 @@ object CallGenerator { val (auxiliaryStmt, result) = call(operands.map(_.resultTerm)) +val nullTermCode = if ( + nullCheck && + isReference(returnType) && + !TypeCheckUtils.isTemporal(returnType)) { --- End diff -- Why do you exclude temporal types here? ---
[GitHub] flink issue #5969: [FLINK-9074] [e2e] Add e2e for resuming from externalized...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5969 thanks for the PR @tzulitai. The new e2e tests passed on my machine. The changes look good as well. +1 to merge ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186528901 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/util/typeutils/FieldAccessorTest.java --- @@ -368,4 +369,23 @@ public void testIllegalBasicType2() { FieldAccessor<Long, Long> f = FieldAccessorFactory.getAccessor(tpeInfo, "foo", null); } + + /** +* Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testRowTypeInfo() { --- End diff -- This test just validates that a `FieldAccessor` is created. At runtime it would fail with a `ClassCastException`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186484649 --- Diff: flink-java/src/test/java/org/apache/flink/api/java/operator/MaxByOperatorTest.java --- @@ -230,4 +235,43 @@ public String toString() { } } + /** +* Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testMaxMinByRowTypeInfoKeyFieldsDataset() { + + final ExecutionEnvironment env = ExecutionEnvironment + .getExecutionEnvironment(); + TypeInformation[] types = new TypeInformation[] {Types.INT, Types.INT}; + + String[] fieldNames = new String[]{"id", "value"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); + DataSet tupleDs = env + .fromCollection(Collections.singleton(new Row(2)), rowTypeInfo); + + tupleDs.maxBy(0); + tupleDs.minBy(0); + } + +/** + * Validates that no ClassCastException happens +* should not fail e.g. like in FLINK-8255. +*/ + @Test + public void testMaxMinByRowTypeInfoKeyFieldsForUnsortedGrouping() { + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + + TypeInformation[] types = new TypeInformation[]{Types.INT, Types.INT}; + + String[] fieldNames = new String[]{"id", "value"}; + RowTypeInfo rowTypeInfo = new RowTypeInfo(types, fieldNames); + + UnsortedGrouping groupDs = env.fromCollection(Collections.singleton(new Row(2)), rowTypeInfo).groupBy(0); + + groupDs.maxBy(1); + groupDs.minBy(1); --- End diff -- The tests pass because the program is not executed. You would have to call `env.collect()` to run the program and compare the returned result against the expected result. As I pointed out before, this will fail, because the operator will cast the `Row` objects to `Tuple`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186477977 --- Diff: flink-java/src/main/java/org/apache/flink/api/java/functions/SelectByMinFunction.java --- @@ -41,7 +41,7 @@ * is regarded in the reduce function. First index has highest priority and last index has * least priority. */ - public SelectByMinFunction(TupleTypeInfo type, int... fields) { + public SelectByMinFunction(TupleTypeInfoBase type, int... fields) { --- End diff -- The `ReduceFunction` is still typed to `T extends Tuple` such that this will still fail at runtime. The same is true for all other built-in aggregation method like `sum()` and `min()` on `DataSet` and `UnsortedGrouping`. This cannot be resolved without major changes. I don't think we should add these features, but rather throw meaningful error messages instead of `ClassCastException`. Can you try to override the the `isTupleType()` method in `RowTypeInfo` and return `false`? This would prevent `Row` from being used in contexts that are only supported for `Tuple`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186527205 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -157,15 +156,15 @@ public T set(T record, F fieldValue) { SimpleTupleFieldAccessor(int pos, TypeInformation typeInfo) { --- End diff -- accessing fields in a `Row` will fail because `Row` does not extend `Tuple`. For a proper fix, we would need a `RowFieldAccessor` and use that one when we deal with a `DataStream`. We would then need to add the `RowFieldAccessor` to the `FieldAccessorFactory`. ---
[GitHub] flink pull request #5961: [FLINK-8255][DataSet API, DataStream API] key expr...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5961#discussion_r186527277 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/util/typeutils/FieldAccessor.java --- @@ -197,7 +196,7 @@ public T set(T record, F fieldValue) { checkNotNull(typeInfo, "typeInfo must not be null."); checkNotNull(innerAccessor, "innerAccessor must not be null."); - int arity = ((TupleTypeInfo) typeInfo).getArity(); + int arity = typeInfo.getArity(); --- End diff -- Same as for `SimpleTupleFieldAccessor`. ---
[GitHub] flink issue #3765: [FLINK-6373] Add runtime support for distinct aggregation...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3765 The features that this PR was going to implement has been resolved by PR #. I will close it. ---
[GitHub] flink issue #3764: [FLINK-6335] Parse DISTINCT over grouped windows in strea...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3764 This PR has been integrated into #5940. I'll close it. ---
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 merging ---
[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 merging ---
[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 Thanks for the update @pavel-shvetsov-git. +1 to merge ---
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 Thanks for the update @walterddr. The PR is good to merge. ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5860 Hi @glaksh100, I just noticed that the bucket closing check is only done when a record is written. Hence, inactive buckets might not get closed in time if a larger inactive bucket interval is configured. In some sense, the new feature is an extended version of the inactive bucket closing feature. How should we handle that case? 1. throw an exception during configuration, i.e., when `setInactiveBucketThreshold()` and `setBatchRolloverInterval()` are called. 2. configure the inactive bucket interval to be at least the rollover interval in case it is configured larger and continue. We should also make sure that the check interval is configured appropriately. I'm leaning towards the first approach. It would make the misconfiguration obvious to the user and fail the program before it is submitted. What do you think? Best, Fabian ---
[GitHub] flink issue #5940: [FLINK-8690][table]Support group window distinct aggregat...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5940 Hmmm, good point. The discussion would be lost. How about I put your changes on top of Haohui's changes before merging? ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033706 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), + (2L, 2, "Hello"), + (8L, 8, "Hello"), + (9L, 9, "Hello World"), + (4L, 4, "Hello"), + (16L, 16, "Hello")) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setParallelism(1) +StreamITCase.clear +val stream = env + .fromCollection(sessionWindowTestdata) + .assignTimestampsAndWatermarks(new TimestampAndWatermarkWithOffset[(Long, Int, String)](10L)) + +val tEnv = TableEnvironment.getTableEnvironment(env) +val table = stream.toTable(tEnv, 'long, 'int, 'string, 'rowtime.rowtime) +tEnv.registerTable("MyTable", table) + +val sqlQuery = "SELECT string, " + + " COUNT(DISTINCT long) " + --- End diff -- It would be good to add the end timestamp of the windows (`SESSION_END(rowtime, INTERVAL '0.005' SECOND)`) to make it easier to eyeball the expected test results. ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186022377 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -641,7 +641,8 @@ class AggregationCodeGenerator( | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | Object k = entry.getKey(); --- End diff -- Change this line to `${classOf[Row].getCanonicalName} k = (${classOf[Row].getCanonicalName}) entry.getKey();` ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186029985 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/stream/sql/DistinctAggregateTest.scala --- @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.api.stream.sql + +import org.apache.flink.api.scala._ +import org.apache.flink.table.api.scala._ +import org.apache.flink.table.plan.logical.{SessionGroupWindow, SlidingGroupWindow, TumblingGroupWindow} +import org.apache.flink.table.utils.{StreamTableTestUtil, TableTestBase} +import org.apache.flink.table.utils.TableTestUtil._ +import org.junit.{Ignore, Test} + +class DistinctAggregateTest extends TableTestBase { + private val streamUtil: StreamTableTestUtil = streamTestUtil() + streamUtil.addTable[(Int, String, Long)]( +"MyTable", +'a, 'b, 'c, +'proctime.proctime, 'rowtime.rowtime) + + @Test + def testDistinct(): Unit = { +val sql = "SELECT DISTINCT a, b, c FROM MyTable" + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a, b, c") +), +term("groupBy", "a, b, c"), +term("select", "a, b, c") + ) +streamUtil.verifySql(sql, expected) + } + + // TODO: this query should be optimized to only have a single DataStreamGroupAggregate + // TODO: reopen this until FLINK-7144 fixed + @Ignore + @Test + def testDistinctAfterAggregate(): Unit = { +val sql = "SELECT DISTINCT a FROM MyTable GROUP BY a, b, c" + +val expected = + unaryNode( +"DataStreamGroupAggregate", +unaryNode( + "DataStreamCalc", + streamTableNode(0), + term("select", "a") +), +term("groupBy", "a"), +term("select", "a") + ) +streamUtil.verifySql(sql, expected) + } + + @Test + def testDistinctAggregateOnTumbleWindow(): Unit = { +val sqlQuery = "SELECT COUNT(DISTINCT a), " + + " SUM(a) " + + "FROM MyTable " + + "GROUP BY TUMBLE(rowtime, INTERVAL '15' MINUTE) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "rowtime", "a") + ), + term("window", TumblingGroupWindow('w$, 'rowtime, 90.millis)), + term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(a) AS EXPR$1") +) + +streamUtil.verifySql(sqlQuery, expected) + } + + @Test + def testMultiDistinctAggregateSameFieldOnHopWindow(): Unit = { +val sqlQuery = "SELECT COUNT(DISTINCT a), " + + " SUM(DISTINCT a), " + + " MAX(DISTINCT a) " + + "FROM MyTable " + + "GROUP BY HOP(rowtime, INTERVAL '15' MINUTE, INTERVAL '1' HOUR) " + +val expected = unaryNode( + "DataStreamGroupWindowAggregate", + unaryNode( +"DataStreamCalc", +streamTableNode(0), +term("select", "rowtime", "a") + ), + term("window", SlidingGroupWindow('w$, 'rowtime, 360.millis, 90.millis)), + term("select", "COUNT(DISTINCT a) AS EXPR$0", "SUM(DISTINCT a) AS EXPR$1", +
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r185955967 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/AggregationCodeGenerator.scala --- @@ -641,7 +641,8 @@ class AggregationCodeGenerator( | java.util.Map.Entry entry = (java.util.Map.Entry) mergeIt$i.next(); | Object k = entry.getKey(); | Long v = (Long) entry.getValue(); - | if (aDistinctAcc$i.add(k, v)) { + | if (aDistinctAcc$i.add( --- End diff -- The key in the entry is a `Row` already ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186036107 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), --- End diff -- To check the correct merge behavior, we need two windows which aggregate the same value that is than deduplicated in merge. Some data like: ``` (1L, 2, "Hello"), // 1. Hello window (2L, 2, "Hello"), // 1. Hello window, deduped (8L, 2, "Hello"), // 2. Hello window, deduped during merge (10L, 3, "Hello"), // 2. Hello window, forwarded during merge (9L, 9, "Hello World"), // 1. Hello World window (4L, 1, "Hello"), // 1. Hello window, triggering merge of 1. and 2. Hello windows (16L, 16, "Hello")) // 3. Hello window (not merged) ``` ---
[GitHub] flink pull request #5940: [FLINK-8690][table]Support group window distinct a...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5940#discussion_r186033401 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/SqlITCase.scala --- @@ -52,6 +52,76 @@ class SqlITCase extends StreamingWithStateTestBase { (8000L, "8", "Hello World"), (2L, "20", "Hello World")) + @Test + def testDistinctAggWithMergeOnEventTimeSessionGroupWindow(): Unit = { +// create a watermark with 10ms offset to delay the window emission by 10ms to verify merge +val sessionWindowTestdata = List( + (1L, 1, "Hello"), --- End diff -- The test is not checking for DISTINCT semantics since all aggregated values are distinct. We could do `COUNT(DISTINCT num)` (`int` has to be renamed to `num` because its a SQL keyword). ---
[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5836#discussion_r185942219 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.Test; + +import java.util.Collections; + +/** + * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}. + */ +public class SplitSideOutputTest { + + private static final OutputTag outputTag = new OutputTag("outputTag") {}; + + @Test + public void testSideOutputAfterSelectIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.split(Collections::singleton); + + try { + processInput.getSideOutput(outputTag); + } catch (UnsupportedOperationException expected){ + // expected + } + } + + @Test + public void testSelectAfterSideOutputIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.getSideOutput(outputTag); + + try { + processInput.split(Collections::singleton); --- End diff -- same as above ---
[GitHub] flink pull request #5836: [FLINK-9141][datastream] Fail early when using bot...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5836#discussion_r185942147 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/api/datastream/SplitSideOutputTest.java --- @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.datastream; + +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.ProcessFunction; +import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; + +import org.junit.Test; + +import java.util.Collections; + +/** + * Tests that verify correct behavior when applying split/getSideOutput operations on one {@link DataStream}. + */ +public class SplitSideOutputTest { + + private static final OutputTag outputTag = new OutputTag("outputTag") {}; + + @Test + public void testSideOutputAfterSelectIsForbidden() { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + SingleOutputStreamOperator processInput = env.fromElements("foo") + .process(new DummyProcessFunction()); + + processInput.split(Collections::singleton); + + try { + processInput.getSideOutput(outputTag); --- End diff -- add `Assert.fail();` after `processInput.getSideOutput(outputTag);` to ensure that the test fails if no exception is thrown. ---
[GitHub] flink issue #5927: [FLINK-8237] [BucketingSink] Better error message added
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5927 Thanks for the PR @pavel-shvetsov-git! I've left a suggestion to improve the error message. Afterwards the PR should be good to merge. ---
[GitHub] flink pull request #5927: [FLINK-8237] [BucketingSink] Better error message ...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5927#discussion_r185932952 --- Diff: flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java --- @@ -550,6 +550,9 @@ private void openNewPartFile(Path bucketPath, BucketState bucketState) throws Path inProgressPath = getInProgressPathFor(partPath); if (bucketState.writer == null) { bucketState.writer = writerTemplate.duplicate(); + if (bucketState.writer == null) { + throw new RuntimeException("Could not duplicate writer."); --- End diff -- I would add the class name of the `writerTemplate` object and that the class needs to implement the `Writer.duplicate()` method. ---
[GitHub] flink issue #5909: [FLINK-8726][docs] Fix and normalize code-highlighting
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5909 Thanks for cleaning up the syntax highlighting @zentol! +1 to merge ---
[GitHub] flink issue #5860: [FLINK-9138][filesystem-connectors] Implement time based ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5860 Thanks for the update @glaksh100! The changes look good to me. What do you think @aljoscha? ---
[GitHub] flink issue #5887: [FLINK-6719] [docs] Add details about fault-tolerance of ...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5887 Thanks for the update @bowenli86. I'll merge the PR later. ---
[GitHub] flink issue #5935: [FLINK-9119] example code error in Concepts & Common API
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5935 Thanks for the fix @yanghua! +1 to merge ---
[GitHub] flink issue #5899: Klink
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/5899 Hi @bitsilly, as @sihuazhou please describe the purpose of this PR. Also please translate the comments to English. Thanks, Fabian ---
[GitHub] flink issue #3783: [FLINK-6388] Add support for DISTINCT into Code Generated...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3783 Hi @huawei-flink , I'll close this PR later today when merging #. # adds runtime support for distinct aggregation and follows a similar approach as this PR. However, it leverages the `MapView` feature and is therefore a bit more generic. Thanks for working on this. This PR led us in the right direction. Best, Fabian ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184442107 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- By default the LocalExecEnv chooses the parallelism is picked based on the number CPU cores. Are you running the tests in some kind of container? ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184419816 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/functions/aggfunctions/DistinctAccumulator.scala --- @@ -47,60 +51,51 @@ class DistinctAccumulator[E, ACC](var realAcc: ACC, var mapView: MapView[E, JLon override def equals(that: Any): Boolean = that match { case that: DistinctAccumulator[E, ACC] => that.canEqual(this) && -this.mapView == that.mapView +this.distinctValueMap == that.distinctValueMap case _ => false } def add(element: E): Boolean = { -if (element != null) { - val currentVal = mapView.get(element) - if (currentVal != null) { -mapView.put(element, currentVal + 1L) -false - } else { -mapView.put(element, 1L) -true - } -} else { +val wrappedElement = Row.of(element) --- End diff -- I think we should remove the `E` type parameter and directly pass the `Row` as an argument. That will also make the extension to multiple arguments very easy. Actually, I think I'll do that before merging ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184405910 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- Also, the event-time test is not required. We test the retract case also with BOUNDED OVER windows (rows the fall out of the window are retracted). ---
[GitHub] flink pull request #5555: [FLINK-8689][table]Add runtime support of distinct...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/#discussion_r184405329 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/stream/sql/OverWindowITCase.scala --- @@ -50,6 +50,155 @@ class OverWindowITCase extends StreamingWithStateTestBase { (8L, 8, "Hello World"), (20L, 20, "Hello World")) + @Test + def testProcTimeDistinctBoundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW), " + + " COLLECT(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime ROWS BETWEEN 3 PRECEDING AND CURRENT ROW) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,{1=1}", + "2,2,2,{2=1}", + "2,3,1,{1=1, 2=1}", + "3,2,2,{2=1}", + "3,2,2,{2=1}", + "3,5,2,{2=1, 3=1}", + "4,2,2,{2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "4,3,1,{1=1, 2=1}", + "5,1,1,{1=1}", + "5,4,1,{1=1, 3=1}", + "5,4,1,{1=1, 3=1}", + "5,6,1,{1=1, 2=1, 3=1}", + "5,5,2,{2=1, 3=1}") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testProcTimeDistinctUnboundedPartitionedRowsOver(): Unit = { + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStateBackend(getStateBackend) +val tEnv = TableEnvironment.getTableEnvironment(env) +env.setParallelism(1) +StreamITCase.clear + +val t = StreamTestData.get5TupleDataStream(env) + .toTable(tEnv, 'a, 'b, 'c, 'd, 'e, 'proctime.proctime) +tEnv.registerTable("MyTable", t) + +val sqlQuery = "SELECT a, " + + " COUNT(e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " SUM(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding), " + + " MIN(DISTINCT e) OVER (" + + "PARTITION BY a ORDER BY proctime RANGE UNBOUNDED preceding) " + + "FROM MyTable" + +val result = tEnv.sqlQuery(sqlQuery).toAppendStream[Row] +result.addSink(new StreamITCase.StringSink[Row]) +env.execute() + +val expected = List( + "1,1,1,1", + "2,1,2,2", + "2,2,3,1", + "3,1,2,2", + "3,2,2,2", + "3,3,5,2", + "4,1,2,2", + "4,2,3,1", + "4,3,3,1", + "4,4,3,1", + "5,1,1,1", + "5,2,4,1", + "5,3,4,1", + "5,4,6,1", + "5,5,6,1") +assertEquals(expected, StreamITCase.testResults) + } + + @Test + def testRowTimeDistinctBoundedNonPartitionedRowsOver(): Unit = { +// use out-of-order data to test distinct accumulator remove +val data = Seq( + Left((2L, (2L, 2, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((2L, (2L, 2, "Hello"))), + Left((1L, (1L, 1, "Hello"))), + Left((20L, (20L, 20, "Hello World"))), // early row + Right(3L), + Left((2L, (2L, 2, "Hello"))), // late row + Left((3L, (3L, 3, "Hello"))), + Left((4L, (4L, 4, "Hello"))), + Left((5L, (5L, 5, "Hello"))), + Left((6L, (6L, 6, "Hello"))), + Left((7L, (7L, 7, "Hello World"))), + Right(7L), + Left((9L, (9L, 9, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Left((8L, (8L, 8, "Hello World"))), + Right(20L)) + +val env = StreamExecutionEnvironment.getExecutionEnvironment +env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) +env.setStateBackend(getStateBackend) +env.setParallelism(1) --- End diff -- Actually, I was wrong on this one. Late elements are not deterministic handled if p > 1. Will change it back. ---
[GitHub] flink pull request #5867: [FLINK-8686] [sql-client] Improve basic embedded S...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/5867#discussion_r184099775 --- Diff: flink-dist/src/main/resources/flink-conf.yaml --- @@ -229,17 +229,26 @@ web.port: 8081 # Directory to upload completed jobs to. Add this directory to the list of # monitored directories of the HistoryServer as well (see below). -#jobmanager.archive.fs.dir: hdfs:///completed-jobs/ +# jobmanager.archive.fs.dir: hdfs:///completed-jobs/ # The address under which the web-based HistoryServer listens. -#historyserver.web.address: 0.0.0.0 +# historyserver.web.address: 0.0.0.0 # The port under which the web-based HistoryServer listens. -#historyserver.web.port: 8082 +# historyserver.web.port: 8082 # Comma separated list of directories to monitor for completed jobs. -#historyserver.archive.fs.dir: hdfs:///completed-jobs/ +# historyserver.archive.fs.dir: hdfs:///completed-jobs/ # Interval in milliseconds for refreshing the monitored directories. -#historyserver.archive.fs.refresh-interval: 1 +# historyserver.archive.fs.refresh-interval: 1 + +#== +# SQL Client +#== + +# The SQL Client CLI can be started via bin/sql-client.sh embedded + +# The heap size for the SQL Client CLI JVM +# sqlclient.cli.heap.mb: 1024 --- End diff -- OK. But there's another discussion thread whether it is useful to share properties across JM/TM JVMs and client processes. ---