[jira] [Commented] (FLINK-6259) Fix a small spelling error
[ https://issues.apache.org/jira/browse/FLINK-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954425#comment-15954425 ] ASF GitHub Bot commented on FLINK-6259: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3667 Fix a small spelling error har-with-dependencies -> jar-with-dependen… …cies Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-6259] Fix a small spelling error") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6259-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3667.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3667 commit ca7740c8df2bde5301c73be2d9931259a5c4b5ad Author: sunjincheng121Date: 2017-04-04T00:55:57Z Fix a small spelling error har-with-dependencies -> jar-with-dependencies > Fix a small spelling error > -- > > Key: FLINK-6259 > URL: https://issues.apache.org/jira/browse/FLINK-6259 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: sunjincheng >Assignee: sunjincheng > > flink-gelly-scala/pom.xml {{har-with-dependencies}} -> > {{jar-with-dependencies}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3667: Fix a small spelling error har-with-dependencies -...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3667 Fix a small spelling error har-with-dependencies -> jar-with-dependen⦠â¦cies Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-6259] Fix a small spelling error") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6259-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3667.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3667 commit ca7740c8df2bde5301c73be2d9931259a5c4b5ad Author: sunjincheng121Date: 2017-04-04T00:55:57Z Fix a small spelling error har-with-dependencies -> jar-with-dependencies --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6259) Fix a small spelling error
[ https://issues.apache.org/jira/browse/FLINK-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6259: --- Description: flink-gelly-scala/pom.xml {{har-with-dependencies}} -> {{jar-with-dependencies}} (was: flink-gelly-scala/pom.xml {{har-with-dependencies}} -> {{ jar-with-dependencies}}) > Fix a small spelling error > -- > > Key: FLINK-6259 > URL: https://issues.apache.org/jira/browse/FLINK-6259 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: sunjincheng >Assignee: sunjincheng > > flink-gelly-scala/pom.xml {{har-with-dependencies}} -> > {{jar-with-dependencies}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6259) Fix a small spelling error
sunjincheng created FLINK-6259: -- Summary: Fix a small spelling error Key: FLINK-6259 URL: https://issues.apache.org/jira/browse/FLINK-6259 Project: Flink Issue Type: Bug Components: Gelly Reporter: sunjincheng Assignee: sunjincheng flink-gelly-scala/pom.xml {{har-with-dependencies}} -> {{ jar-with-dependencies}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6247) Build a jar-with-dependencies for flink-table and put it into ./opt
[ https://issues.apache.org/jira/browse/FLINK-6247?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954418#comment-15954418 ] ASF GitHub Bot commented on FLINK-6247: --- GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3666 [FLINK-6247][table] Build a jar-with-dependencies for flink-table and… … put it into ./opt Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-6247] Build a jar-with-dependencies for flink-table and put it into ./opt") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6247-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3666.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3666 commit 5c2ddf580d23e7745499c0d5f639b18d766e6d24 Author: sunjincheng121Date: 2017-03-31T08:31:18Z [FLINK-6247][table] Build a jar-with-dependencies for flink-table and put it into ./opt > Build a jar-with-dependencies for flink-table and put it into ./opt > --- > > Key: FLINK-6247 > URL: https://issues.apache.org/jira/browse/FLINK-6247 > Project: Flink > Issue Type: Improvement > Components: Build System, Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: sunjincheng > > Due to a problem with Calcite and the unloading of classes, user-code > classloaders that include Calcite cannot be garbage collected. This is a > problem for long-running clusters that execute multiple Table API / SQL > programs with fat JARs that include the flink-table dependency. Each executed > program comes with an own user-code classloader that cannot be cleaned up > later. > As a workaround, we recommend to copy the flink-table dependency into the > ./lib folder. However, we do not have a jar file with all required transitive > dependencies (Calcite, Janino, etc). Hence, users would need to build this > jar file themselves or copy all jars into ./lib. > This issue is about creating a jar-with-dependencies and adding it to the > ./opt folder. Users can then copy the jar file from ./opt to ./lib to include > the table API in the classpath of Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3666: [FLINK-6247][table] Build a jar-with-dependencies ...
GitHub user sunjincheng121 opened a pull request: https://github.com/apache/flink/pull/3666 [FLINK-6247][table] Build a jar-with-dependencies for flink-table and⦠⦠put it into ./opt Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration. If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the [How To Contribute guide](http://flink.apache.org/how-to-contribute.html). In addition to going through the list, please provide a meaningful description of your changes. - [x] General - The pull request references the related JIRA issue ("[FLINK-6247] Build a jar-with-dependencies for flink-table and put it into ./opt") - The pull request addresses only one issue - Each commit in the PR has a meaningful commit message (including the JIRA id) - [ ] Documentation - Documentation has been added for new functionality - Old documentation affected by the pull request has been updated - JavaDoc for public methods has been added - [ ] Tests & Build - Functionality added by the pull request is covered by tests - `mvn clean verify` has been executed successfully locally or a Travis build has passed You can merge this pull request into a Git repository by running: $ git pull https://github.com/sunjincheng121/flink FLINK-6247-PR Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3666.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3666 commit 5c2ddf580d23e7745499c0d5f639b18d766e6d24 Author: sunjincheng121Date: 2017-03-31T08:31:18Z [FLINK-6247][table] Build a jar-with-dependencies for flink-table and put it into ./opt --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...
Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 Okay. Shall we open a JIRA to deprecate `WindowOperatorTest` ? I found a minor bug in `WindowOperatorTest#Tuple3ResultSortComparator` (same as `WindowOperatorMigrationTest`), do we want to fix it ? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4953) Allow access to "time" in ProcessWindowFunction.Context
[ https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954344#comment-15954344 ] ASF GitHub Bot commented on FLINK-4953: --- Github user manuzhang commented on the issue: https://github.com/apache/flink/pull/3661 Okay. Shall we open a JIRA to deprecate `WindowOperatorTest` ? I found a minor bug in `WindowOperatorTest#Tuple3ResultSortComparator` (same as `WindowOperatorMigrationTest`), do we want to fix it ? > Allow access to "time" in ProcessWindowFunction.Context > --- > > Key: FLINK-4953 > URL: https://issues.apache.org/jira/browse/FLINK-4953 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Manu Zhang >Assignee: Manu Zhang > > The recently added {{ProcessWindowFunction}} has a {{Context}} object that > allows querying some additional information about the window firing that we > are processing. Right now, this is only the window for which the firing is > happening. We should extends this with methods that allow querying the > current processing time and the current watermark. > Original text by issue creator: This is similar to FLINK-3674 but exposing > time information in window functions. Currently when a timer is fired, all > states in a window will be returned to users, including those after the > timer. This change will allow users to filter out states after the timer > based on time info. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954326#comment-15954326 ] ASF GitHub Bot commented on FLINK-5991: --- Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109547924 --- Diff: docs/dev/stream/state.md --- @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream ## Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed` interface. -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -# ListCheckpointed + CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -# CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of --- End diff -- "Even-split" --> Not really sure what would be the best wording here ... Any ideas? > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We
[GitHub] flink pull request #3508: [FLINK-5991] [state-backend, streaming] Expose Bro...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109547924 --- Diff: docs/dev/stream/state.md --- @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream ## Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed` interface. -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -# ListCheckpointed + CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -# CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of --- End diff -- "Even-split" --> Not really sure what would be the best wording here ... Any ideas? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 Updated managed operator state docs. @aljoscha @StefanRRichter could you have a final look at it? If there's no other problems I'll proceed to merge this. Thanks :) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954323#comment-15954323 ] ASF GitHub Bot commented on FLINK-5991: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 Updated managed operator state docs. @aljoscha @StefanRRichter could you have a final look at it? If there's no other problems I'll proceed to merge this. Thanks :) > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954190#comment-15954190 ] Fabian Hueske commented on FLINK-6249: -- Hi [~rtudoran], thanks for creating the JIRA. I think the plan to implement the deduplication in the ProcessFunction is good. It gives more flexibility and might also allow to use user-defined aggregation functions with distinct. For now, I would not implement DISTINCT for unbounded OVER windows. I think the discussion of the non-OVER window DISTINCT cases is a bit confusing in the context of this issue. Can you move those parts into separate JIRAs to keep the discussion focused on DISTINCT with OVER windows? > Distinct Aggregates for OVER window > --- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: radu > Labels: features, patch > > Time target: ProcTime/EventTime > SQL targeted query examples: > > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or > some bounded defined structures. Otherwise the operation would keep > an infinite amount of data to ensure uniqueness and would not > trigger for certain functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT > for UNBOUND sliding windows. However, there would be no control over > the data structure to keep seen data (to check it is not re-process). -> > This needs to be decided if we want to support it (to create appropriate JIRA > issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE (Q1.{1-4}). (This is the main target of > this JIRA) > => Aggregations over distinct elements without any boundary (i.e. > within SELECT clause) do not make sense just as aggregations do not > make sense without groupings or windows. > Other similar query support > > Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for > the elements of the aggregate(s) > `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() > TO HOUR)` > => We need to decide if we aim to support for this release distinct > aggregates for the group by (Q2). If so sub-JIRA issues need to be created. > We can follow the same design/implementation. > => We can consider as a sub-JIRA issue the implementation of DISTINCT > for select clauses. However, there is no control over the growing > size of the data structure and it will unavoidably crash the memory. > Q3. Distinct is applied to the collection of outputs to be selected. > `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY > FLOOR(procTime() TO DAY)` > Description: > > The DISTINCT operator requires processing the elements to ensure > uniqueness. Either that the operation is used for SELECT ALL distinct > elements or for applying typical aggregation functions over a set of > elements, there is a prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the > type of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less prefered) or extend the sliding window aggregates > implementation in the processFunction with distinctinction identification > support (prefered). The later option is prefered because a query can carry > multiple aggregates including multiple aggregates that have the distinct key > word set up. Implementing the distinction between elements in the process > function avoid the need to multiply the data structure to mark what what was > seen across multiple aggregates. It also makes the implementation more robust > and resilient as we cn keep the data structure for marking the seen elements > in a state (mapstate). > - If distinct is applied as in Q2 example on
[jira] [Commented] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954189#comment-15954189 ] ASF GitHub Bot commented on FLINK-6011: --- GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3665 [FLINK-6011] Support TUMBLE, HOP, SESSION window in streaming SQL. This PR adds supports for the `TUMBLE`, `HOP`, and `SESSION` windows in Flink. The work of supporting WindowStart and WindowEnd expressions will be deferred to FLINK-6012. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-6011 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3665 commit ef54aea409ea95feb04e651a0fde2fd4b620d888 Author: Haohui MaiDate: 2017-04-03T21:32:09Z [FLINK-6011] Support TUMBLE, HOP, SESSION window in streaming SQL. > Support TUMBLE, HOP, SESSION window in streaming SQL > > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...
GitHub user haohui opened a pull request: https://github.com/apache/flink/pull/3665 [FLINK-6011] Support TUMBLE, HOP, SESSION window in streaming SQL. This PR adds supports for the `TUMBLE`, `HOP`, and `SESSION` windows in Flink. The work of supporting WindowStart and WindowEnd expressions will be deferred to FLINK-6012. You can merge this pull request into a Git repository by running: $ git pull https://github.com/haohui/flink FLINK-6011 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3665.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3665 commit ef54aea409ea95feb04e651a0fde2fd4b620d888 Author: Haohui MaiDate: 2017-04-03T21:32:09Z [FLINK-6011] Support TUMBLE, HOP, SESSION window in streaming SQL. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 I've opened a separate JIRA to deprecate `ListCheckpointed`. Lets keep this PR self-contained in just refining the `OperatorStateStore` interface. I think this PR is still lacking an update to https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state before its good to go. Adding this ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15954123#comment-15954123 ] ASF GitHub Bot commented on FLINK-5991: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 I've opened a separate JIRA to deprecate `ListCheckpointed`. Lets keep this PR self-contained in just refining the `OperatorStateStore` interface. I think this PR is still lacking an update to https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state before its good to go. Adding this ;) > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6258) Deprecate ListCheckpointed interface for managed operator state
Tzu-Li (Gordon) Tai created FLINK-6258: -- Summary: Deprecate ListCheckpointed interface for managed operator state Key: FLINK-6258 URL: https://issues.apache.org/jira/browse/FLINK-6258 Project: Flink Issue Type: Improvement Components: State Backends, Checkpointing Reporter: Tzu-Li (Gordon) Tai Per discussion in https://github.com/apache/flink/pull/3508, we consider deprecating the `ListCheckpointed` interface to discourage Java serialization shortcuts for state registrations (towards this, the Java serialization shortcuts provided by the `OperatorStateStore` interface have already been deprecated in https://github.com/apache/flink/pull/3508). We should also remember to update https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state if we decide to do this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6209) StreamPlanEnvironment always has a parallelism of 1
[ https://issues.apache.org/jira/browse/FLINK-6209?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953916#comment-15953916 ] Aljoscha Krettek commented on FLINK-6209: - [~wheat9] The snipped was removed because it wasn't obvious that this would break something. I reverted the changes for FLINK-5808 on release-1.2 and will work on finding proper fixes. > StreamPlanEnvironment always has a parallelism of 1 > --- > > Key: FLINK-6209 > URL: https://issues.apache.org/jira/browse/FLINK-6209 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1 >Reporter: Haohui Mai >Assignee: Haohui Mai > > Thanks [~bill.liu8904] for triaging the issue. > After FLINK-5808 we saw that the Flink jobs that are uploaded through the UI > always have a parallelism of 1, even the parallelism is explicitly set via in > the UI. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6217) ContaineredTaskManagerParameters sets off heap memory size incorrectly
[ https://issues.apache.org/jira/browse/FLINK-6217?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953865#comment-15953865 ] ASF GitHub Bot commented on FLINK-6217: --- Github user haohui commented on the issue: https://github.com/apache/flink/pull/3648 The test failure seems unrelated. Cannot reproduce locally. > ContaineredTaskManagerParameters sets off heap memory size incorrectly > -- > > Key: FLINK-6217 > URL: https://issues.apache.org/jira/browse/FLINK-6217 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Haohui Mai >Assignee: Haohui Mai > > Thanks [~bill.liu8904] for triaging the issue. > When {{taskmanager.memory.off-heap}} is disabled, we observed that the total > memory that Flink allocates exceed the total memory of the container: > For a 8G container the JobManager starts the container with the following > parameter: > {noformat} > $JAVA_HOME/bin/java -Xms6072m -Xmx6072m -XX:MaxDirectMemorySize=6072m ... > {noformat} > The total amount of heap memory plus the off-heap memory exceeds the total > amount of memory of the container. As a result YARN occasionally kills the > container. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3648: [FLINK-6217] ContaineredTaskManagerParameters sets off-he...
Github user haohui commented on the issue: https://github.com/apache/flink/pull/3648 The test failure seems unrelated. Cannot reproduce locally. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2814) DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode
[ https://issues.apache.org/jira/browse/FLINK-2814?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953831#comment-15953831 ] ASF GitHub Bot commented on FLINK-2814: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3563 I think cherry-picking this into the `release-1.2` branch would be nice > DeltaIteration: DualInputPlanNode cannot be cast to SingleInputPlanNode > --- > > Key: FLINK-2814 > URL: https://issues.apache.org/jira/browse/FLINK-2814 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.10.0 >Reporter: Greg Hogan >Assignee: Rekha Joshi > Fix For: 1.3.0 > > > A delta iteration that closes with a solution set which is a {{JoinOperator}} > throws the following exception: > {noformat} > org.apache.flink.client.program.ProgramInvocationException: The main method > caused an error. > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:444) > at > org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:345) > at org.apache.flink.client.program.Client.runBlocking(Client.java:289) > at > org.apache.flink.client.CliFrontend.executeProgramBlocking(CliFrontend.java:669) > at org.apache.flink.client.CliFrontend.run(CliFrontend.java:324) > at > org.apache.flink.client.CliFrontend.parseParameters(CliFrontend.java:969) > at org.apache.flink.client.CliFrontend.main(CliFrontend.java:1019) > Caused by: java.lang.ClassCastException: > org.apache.flink.optimizer.plan.DualInputPlanNode cannot be cast to > org.apache.flink.optimizer.plan.SingleInputPlanNode > at > org.apache.flink.optimizer.dag.WorksetIterationNode.instantiate(WorksetIterationNode.java:432) > at > org.apache.flink.optimizer.dag.TwoInputNode.addLocalCandidates(TwoInputNode.java:557) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:478) > at > org.apache.flink.optimizer.dag.DataSinkNode.getAlternativePlans(DataSinkNode.java:204) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:309) > at > org.apache.flink.optimizer.dag.TwoInputNode.getAlternativePlans(TwoInputNode.java:308) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:500) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:271) > at > org.apache.flink.client.program.Client.getOptimizedPlan(Client.java:543) > at org.apache.flink.client.program.Client.runBlocking(Client.java:350) > at > org.apache.flink.client.program.ContextEnvironment.execute(ContextEnvironment.java:64) > at > org.apache.flink.api.java.ExecutionEnvironment.execute(ExecutionEnvironment.java:796) > at org.apache.flink.api.java.DataSet.collect(DataSet.java:424) > at org.apache.flink.api.java.DataSet.print(DataSet.java:1365) > at Driver.main(Driver.java:366) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at > org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:429) > ... 6 more > {noformat} > Temporary fix is to attach an identity mapper. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3563: [FLINK-2814] [optimizer] DualInputPlanNode cannot be cast...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/3563 I think cherry-picking this into the `release-1.2` branch would be nice --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6243) Continuous Joins: True Sliding Window Joins
[ https://issues.apache.org/jira/browse/FLINK-6243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953798#comment-15953798 ] Elias Levy commented on FLINK-6243: --- Thanks for pointing out FLINK-5725. I did review the documentation of the Table API and noticed the lack of join support for streaming sources. I would suggest reviewing the functionality of Kafka Streams in this area. They have already implemented joins across both {{KStreams}} and {{KTables}}, and support outer joins. The implementation is not too complex and works largely as I described above. Elements from each stream are buffered to a ordered key-value store (RocksDB) inserting the timestamp into the key to order elements for the same key by time. Elements are only kept for the chosen time window. The store is structured to efficiently delete old elements (the store is segmented by time and expired segments dropped). To perform the join on an incoming element a range scan is for the key is performed against the elements buffered in the other stream's store. > Continuous Joins: True Sliding Window Joins > > > Key: FLINK-6243 > URL: https://issues.apache.org/jira/browse/FLINK-6243 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Affects Versions: 1.1.4 >Reporter: Elias Levy > > Flink defines sliding window joins as the join of elements of two streams > that share a window of time, where the windows are defined by advancing them > forward some amount of time that is less than the window time span. More > generally, such windows are just overlapping hopping windows. > Other systems, such as Kafka Streams, support a different notion of sliding > window joins. In these systems, two elements of a stream are joined if the > absolute time difference between the them is less or equal the time window > length. > This alternate notion of sliding window joins has some advantages in some > applications over the current implementation. > Elements to be joined may both fall within multiple overlapping sliding > windows, leading them to be joined multiple times, when we only wish them to > be joined once. > The implementation need not instantiate window objects to keep track of > stream elements, which becomes problematic in the current implementation if > the window size is very large and the slide is very small. > It allows for asymmetric time joins. E.g. join if elements from stream A are > no more than X time behind and Y time head of an element from stream B. > It is currently possible to implement a join with these semantics using > {{CoProcessFunction}}, but the capability should be a first class feature, > such as it is in Kafka Streams. > To perform the join, elements of each stream must be buffered for at least > the window time length. To allow for large window sizes and high volume of > elements, the state, possibly optionally, should be buffered such as it can > spill to disk (e.g. by using RocksDB). > The same stream may be joined multiple times in a complex topology. As an > optimization, it may be wise to reuse any element buffer among colocated join > operators. Otherwise, there may write amplification and increased state that > must be snapshotted. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3664: [FLINK-5808] Revert changes
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3664 [FLINK-5808] Revert changes You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink revert-5808-release12 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3664 commit 1fc36aca9eb611890c5ca14819c785924a6a66ae Author: Aljoscha KrettekDate: 2017-04-03T16:39:43Z Revert "[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()" This reverts commit 99fb80be773499907d379553010dd999214f64fb. This fix was causing more problems than it was solving. commit af43ee29e1f3d0c8e888d0b75af2673694f22c8b Author: Aljoscha Krettek Date: 2017-04-03T16:40:05Z Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig" This reverts commit d3b275f4b7d49b67013e26d1f29a065d3131c664. This fix was causing more problems than it was solving. commit 8adb8898fa4242a8be7011d4a55d26bf0c83f75f Author: Aljoscha Krettek Date: 2017-04-03T16:40:14Z Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator" This reverts commit b563f0ae2e7b7233e29e03fbb2cf18b0d853c0ca. This fix was causing more problems than it was solving. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953775#comment-15953775 ] ASF GitHub Bot commented on FLINK-5808: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3664 [FLINK-5808] Revert changes You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink revert-5808-release12 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3664.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3664 commit 1fc36aca9eb611890c5ca14819c785924a6a66ae Author: Aljoscha KrettekDate: 2017-04-03T16:39:43Z Revert "[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()" This reverts commit 99fb80be773499907d379553010dd999214f64fb. This fix was causing more problems than it was solving. commit af43ee29e1f3d0c8e888d0b75af2673694f22c8b Author: Aljoscha Krettek Date: 2017-04-03T16:40:05Z Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig" This reverts commit d3b275f4b7d49b67013e26d1f29a065d3131c664. This fix was causing more problems than it was solving. commit 8adb8898fa4242a8be7011d4a55d26bf0c83f75f Author: Aljoscha Krettek Date: 2017-04-03T16:40:14Z Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator" This reverts commit b563f0ae2e7b7233e29e03fbb2cf18b0d853c0ca. This fix was causing more problems than it was solving. > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Reopened] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reopened FLINK-5808: - Assignee: Aljoscha Krettek Reopening because this is not properly fixed and we discovered follow-up problems: - FLINK-6188: Some setParallelism() methods can't cope with default parallelism - FLINK-6209: StreamPlanEnvironment always has a parallelism of 1 > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953761#comment-15953761 ] Elias Levy commented on FLINK-3089: --- Thanks for the clarification. It is curious that the {{TimerService}} interface allows for the creation of a timer, but not the removal or overriding of a previous timer for the same stream key. > State API Should Support Data Expiration > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific OperatorState > value which I can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6124) support max/min aggregations for string type
[ https://issues.apache.org/jira/browse/FLINK-6124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953740#comment-15953740 ] ASF GitHub Bot commented on FLINK-6124: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3593 > support max/min aggregations for string type > > > Key: FLINK-6124 > URL: https://issues.apache.org/jira/browse/FLINK-6124 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao > Fix For: 1.3.0 > > > Recently when I port some query to Flink SQL, I found currently min/max > aggregations on string type is not supported and should be added. > When min/max aggregations are used on string column, return min/max value by > lexicographically order. > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3593: [FLINK-6124] [table] Add min/max string aggregatio...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3593 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6124) support max/min aggregations for string type
[ https://issues.apache.org/jira/browse/FLINK-6124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953738#comment-15953738 ] Timo Walther commented on FLINK-6124: - Added retraction support in: c5282cbcf898b99593ca753fce7557bae1ae09aa > support max/min aggregations for string type > > > Key: FLINK-6124 > URL: https://issues.apache.org/jira/browse/FLINK-6124 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao > Fix For: 1.3.0 > > > Recently when I port some query to Flink SQL, I found currently min/max > aggregations on string type is not supported and should be added. > When min/max aggregations are used on string column, return min/max value by > lexicographically order. > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3593: [FLINK-6124] [table] Add min/max string aggregation with ...
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3593 Thanks for the hint @shaoxuan-wang. I added the tests and will merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6124) support max/min aggregations for string type
[ https://issues.apache.org/jira/browse/FLINK-6124?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953729#comment-15953729 ] ASF GitHub Bot commented on FLINK-6124: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3593 Thanks for the hint @shaoxuan-wang. I added the tests and will merge this. > support max/min aggregations for string type > > > Key: FLINK-6124 > URL: https://issues.apache.org/jira/browse/FLINK-6124 > Project: Flink > Issue Type: Improvement > Components: Table API & SQL >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao > Fix For: 1.3.0 > > > Recently when I port some query to Flink SQL, I found currently min/max > aggregations on string type is not supported and should be added. > When min/max aggregations are used on string column, return min/max value by > lexicographically order. > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6257) Post-pass OVER windows
[ https://issues.apache.org/jira/browse/FLINK-6257?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske updated FLINK-6257: - Issue Type: Sub-task (was: Improvement) Parent: FLINK-4557 > Post-pass OVER windows > -- > > Key: FLINK-6257 > URL: https://issues.apache.org/jira/browse/FLINK-6257 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Priority: Critical > > The OVER windows have been implemented by several contributors. > We should do a post pass over the contributed code and: > * Functionality > ** currently every time attributes is allows as ORDER BY attribute. We must > check that this is actually a time indicator ({{procTime()}}, {{rowTime()}}) > an that the order is ASCENDING. > * Documentation > ** Add documentation for OVER windows > * Code style > ** Consistent naming of {{ProcessFunctions}} and methods > * Tests > ** Move the OVER window tests out of SqlITCase into a dedicated class > ** Move the OVER window tests out of WindowAggregateTest into a dedicated > class > ** Add tests based on the test harness for all {{ProcessFunctions}} similar > to {{BoundedProcessingOverRangeProcessFunction}}. The tests should include > exact boundary checks for range windows and check for proper parallelization > with multiple keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6257) Post-pass OVER windows
Fabian Hueske created FLINK-6257: Summary: Post-pass OVER windows Key: FLINK-6257 URL: https://issues.apache.org/jira/browse/FLINK-6257 Project: Flink Issue Type: Improvement Components: Table API & SQL Affects Versions: 1.3.0 Reporter: Fabian Hueske Priority: Critical The OVER windows have been implemented by several contributors. We should do a post pass over the contributed code and: * Functionality ** currently every time attributes is allows as ORDER BY attribute. We must check that this is actually a time indicator ({{procTime()}}, {{rowTime()}}) an that the order is ASCENDING. * Documentation ** Add documentation for OVER windows * Code style ** Consistent naming of {{ProcessFunctions}} and methods * Tests ** Move the OVER window tests out of SqlITCase into a dedicated class ** Move the OVER window tests out of WindowAggregateTest into a dedicated class ** Add tests based on the test harness for all {{ProcessFunctions}} similar to {{BoundedProcessingOverRangeProcessFunction}}. The tests should include exact boundary checks for range windows and check for proper parallelization with multiple keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3660: [FLINK-6237] [table] support RAND and RAND_INTEGER...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109453119 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1106,6 +1106,31 @@ class CodeGenerator( requireArray(array) generateArrayElement(this, array) + // RAND([seed]) + case RAND if isNumeric(resultType) && operands.size <= 1 => --- End diff -- We don't need this changes if the method is implemented using a `CallGenerator` in `FunctionGenerator`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953697#comment-15953697 ] ASF GitHub Bot commented on FLINK-6237: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109453119 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1106,6 +1106,31 @@ class CodeGenerator( requireArray(array) generateArrayElement(this, array) + // RAND([seed]) + case RAND if isNumeric(resultType) && operands.size <= 1 => --- End diff -- We don't need this changes if the method is implemented using a `CallGenerator` in `FunctionGenerator`. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3660: [FLINK-6237] [table] support RAND and RAND_INTEGER...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109453682 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( +randField: String, +seedExpr: GeneratedExpression, +resultType: TypeInformation[_]) + : GeneratedExpression = { +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) +val randCode = if (seedExpr != null) { + s""" + |if ($randField == null) { + | ${seedExpr.code} + | $randField = new java.util.Random(${seedExpr.resultTerm}); --- End diff -- What happens if `seedExpr.nullTerm` is true? We need a case distinction for both null check enabled and disabled. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953695#comment-15953695 ] ASF GitHub Bot commented on FLINK-6237: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109453682 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( +randField: String, +seedExpr: GeneratedExpression, +resultType: TypeInformation[_]) + : GeneratedExpression = { +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) +val randCode = if (seedExpr != null) { + s""" + |if ($randField == null) { + | ${seedExpr.code} + | $randField = new java.util.Random(${seedExpr.resultTerm}); --- End diff -- What happens if `seedExpr.nullTerm` is true? We need a case distinction for both null check enabled and disabled. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953696#comment-15953696 ] ASF GitHub Bot commented on FLINK-6237: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109451391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( --- End diff -- The ScalarOperators class is intended for basic operations such as plus, minus, equals etc. Can you add the functions to `FunctionGenerator` class? > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3660: [FLINK-6237] [table] support RAND and RAND_INTEGER...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109451391 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( --- End diff -- The ScalarOperators class is intended for basic operations such as plus, minus, equals etc. Can you add the functions to `FunctionGenerator` class? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6219) Add a state backend which supports sorting
[ https://issues.apache.org/jira/browse/FLINK-6219?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953687#comment-15953687 ] Fabian Hueske commented on FLINK-6219: -- If we add such a {{TimeSortedQueue}}, the user code could rely on the sorted access. For RocksDB this would mean we could just iterate (as long as RocksDB provides this behavior). For the other state backend, we would need to add a sort, but in the State implementation, not in the user code. > Add a state backend which supports sorting > -- > > Key: FLINK-6219 > URL: https://issues.apache.org/jira/browse/FLINK-6219 > Project: Flink > Issue Type: New Feature > Components: State Backends, Checkpointing, Table API & SQL >Reporter: sunjincheng > > When we implement the OVER window of > [FLIP11|https://cwiki.apache.org/confluence/display/FLINK/FLIP-11%3A+Table+API+Stream+Aggregations] > We notice that we need a state backend which supports sorting, allows for > efficient insertion, traversal in order, and removal from the head. > For example: In event-time OVER window, we need to sort by time,If the datas > as follow: > {code} > (1L, 1, Hello) > (2L, 2, Hello) > (5L, 5, Hello) > (4L, 4, Hello) > {code} > We randomly insert the datas, just like: > {code} > put((2L, 2, Hello)),put((1L, 1, Hello)),put((5L, 5, Hello)),put((4L, 4, > Hello)), > {code} > We deal with elements in time order: > {code} > process((1L, 1, Hello)),process((2L, 2, Hello)),process((4L, 4, > Hello)),process((5L, 5, Hello)) > {code} > Welcome anyone to give feedback,And what do you think? [~xiaogang.shi] > [~aljoscha] [~fhueske] -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5994) Add Janino to flink-table JAR file
[ https://issues.apache.org/jira/browse/FLINK-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953682#comment-15953682 ] Fabian Hueske commented on FLINK-5994: -- I don't think we need to include Janino into the default {{flink-table}} artifact. If the {{flink-table}} dependency is added to a project (pom.xml), Maven (or any other build tool) will automatically fetch the required dependencies including Janino. The default jar is not the right file to add to the ./lib folder. I created FLINK-6247 to create a jar-with-dependencies for {{flink-table}} which should be put to the {{./opt}} folder. From there users can copy it to {{./lib}} to load the Table API into the default class loader. I propose to close this issue. > Add Janino to flink-table JAR file > -- > > Key: FLINK-5994 > URL: https://issues.apache.org/jira/browse/FLINK-5994 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: sunjincheng > > It seems that Janino is not part of the flink-table JAR file although it is a > dependency in pom.xml. Users adding flink-table to Flink's lib folder because > of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6112) Support new numerical functions when calcite release 1.12
[ https://issues.apache.org/jira/browse/FLINK-6112?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953678#comment-15953678 ] Timo Walther commented on FLINK-6112: - [~mtunqiue] We bumped up the Calcite version. We can now implement this issue. > Support new numerical functions when calcite release 1.12 > - > > Key: FLINK-6112 > URL: https://issues.apache.org/jira/browse/FLINK-6112 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Tao Meng >Assignee: Tao Meng > > CALCITE-1557 introduces the support of some missing numerical functions. > We should add the functions. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5435) Cleanup the rules introduced by FLINK-5144 when calcite releases 1.12
[ https://issues.apache.org/jira/browse/FLINK-5435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953675#comment-15953675 ] Timo Walther commented on FLINK-5435: - [~ykt836] We updated the Calcite version. Would be great if you could open a PR for this issue or unassign it. > Cleanup the rules introduced by FLINK-5144 when calcite releases 1.12 > - > > Key: FLINK-5435 > URL: https://issues.apache.org/jira/browse/FLINK-5435 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Kurt Young >Assignee: Kurt Young >Priority: Minor > > When fixing https://issues.apache.org/jira/browse/FLINK-5144, we actually > copied some classes from Calcite and do a quick fix in Flink. The fixing is > actually merged by Calcite and will be included in version 1.12, we should > update the Calcite version and remove the classes we copied. > The classes we copied: > 1. FlinkAggregateJoinTransposeRule > 2. RelDecorrelator -> FlinkRelDecorrelator -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5545) Remove FlinkAggregateExpandDistinctAggregatesRule when upgrading to Calcite 1.12
[ https://issues.apache.org/jira/browse/FLINK-5545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953674#comment-15953674 ] Timo Walther commented on FLINK-5545: - [~docete] We updated the Calcite version. Would be great if you could open a PR for this issue or unassign it. > Remove FlinkAggregateExpandDistinctAggregatesRule when upgrading to Calcite > 1.12 > > > Key: FLINK-5545 > URL: https://issues.apache.org/jira/browse/FLINK-5545 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Zhenghua Gao >Assignee: Zhenghua Gao >Priority: Minor > > We copy calcite's AggregateExpandDistinctAggregatesRule to Flink project, and > do a quick fix to avoid some bad case mentioned in CALCITE-1558. > Should drop it and use calcite's AggregateExpandDistinctAggregatesRule when > we upgrade to calcite 1.12(above) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6256) Fix documentation of ProcessFunction.
Kostas Kloudas created FLINK-6256: - Summary: Fix documentation of ProcessFunction. Key: FLINK-6256 URL: https://issues.apache.org/jira/browse/FLINK-6256 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.3.0 Reporter: Kostas Kloudas Priority: Blocker Fix For: 1.3.0 In the code example on how to define an {{OutputTag}} and how to use it to extract the side-output stream, the name of the defined output tag and that of the one used in the {{getSideOutput()}} differ. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953667#comment-15953667 ] Timo Walther commented on FLINK-5829: - Fixed in 1.3.0: 05ceec0ace45135bc1cc17934a0b8721f4f85a03 > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-6254: - Assignee: Kostas Kloudas > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3613: [FLINK-5829] Bump Calcite version to 1.12 once ava...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3613 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953666#comment-15953666 ] ASF GitHub Bot commented on FLINK-5829: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3613 > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6255) Remove PatternStream.getSideOutput()
Aljoscha Krettek created FLINK-6255: --- Summary: Remove PatternStream.getSideOutput() Key: FLINK-6255 URL: https://issues.apache.org/jira/browse/FLINK-6255 Project: Flink Issue Type: Improvement Components: CEP Reporter: Aljoscha Krettek We cannot currently use the result of {{select()/flatSelect()}} to get the side output stream because the operator that emits the side output is not the same operator that is returned from the {{select()}} method. There is always a map or flatMap after the actual CEP operator. We first have to change the CEP operator(s) to directly execute the map/flatMap function inside the operator. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
Aljoscha Krettek created FLINK-6254: --- Summary: Consolidate late data methods on PatternStream and WindowedStream Key: FLINK-6254 URL: https://issues.apache.org/jira/browse/FLINK-6254 Project: Flink Issue Type: Improvement Components: CEP Reporter: Aljoscha Krettek Priority: Blocker Fix For: 1.3.0 {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. {{WindowedStream}} had the method first so we should stick to that naming scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-6215: - Assignee: Kostas Kloudas > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953646#comment-15953646 ] radu commented on FLINK-6249: - [~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] [~stefano.bortoli] I have opened a JIRA design to extend the current window aggregates implementation to support DISTINCT. I have created so far subtasks for this for the boundaed over windows. Depending on your opinion we can potentially open other subtasks for the group by aggregates as well as for the unbounded over windows > Distinct Aggregates for OVER window > --- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: radu > Labels: features, patch > > Time target: ProcTime/EventTime > SQL targeted query examples: > > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or > some bounded defined structures. Otherwise the operation would keep > an infinite amount of data to ensure uniqueness and would not > trigger for certain functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT > for UNBOUND sliding windows. However, there would be no control over > the data structure to keep seen data (to check it is not re-process). -> > This needs to be decided if we want to support it (to create appropriate JIRA > issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE (Q1.{1-4}). (This is the main target of > this JIRA) > => Aggregations over distinct elements without any boundary (i.e. > within SELECT clause) do not make sense just as aggregations do not > make sense without groupings or windows. > Other similar query support > > Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for > the elements of the aggregate(s) > `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() > TO HOUR)` > => We need to decide if we aim to support for this release distinct > aggregates for the group by (Q2). If so sub-JIRA issues need to be created. > We can follow the same design/implementation. > => We can consider as a sub-JIRA issue the implementation of DISTINCT > for select clauses. However, there is no control over the growing > size of the data structure and it will unavoidably crash the memory. > Q3. Distinct is applied to the collection of outputs to be selected. > `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY > FLOOR(procTime() TO DAY)` > Description: > > The DISTINCT operator requires processing the elements to ensure > uniqueness. Either that the operation is used for SELECT ALL distinct > elements or for applying typical aggregation functions over a set of > elements, there is a prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the > type of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less prefered) or extend the sliding window aggregates > implementation in the processFunction with distinctinction identification > support (prefered). The later option is prefered because a query can carry > multiple aggregates including multiple aggregates that have the distinct key > word set up. Implementing the distinction between elements in the process > function avoid the need to multiply the data structure to mark what what was > seen across multiple aggregates. It also makes the implementation more robust > and resilient as we cn keep the data structure for marking the seen elements > in a state (mapstate). > - If distinct is applied as in Q2 example on group elements than > either we define a new implementation if selection is general or > extend the current implementation of
[jira] [Created] (FLINK-6253) Distinct rowTime with time range boundaries
radu created FLINK-6253: --- Summary: Distinct rowTime with time range boundaries Key: FLINK-6253 URL: https://issues.apache.org/jira/browse/FLINK-6253 Project: Flink Issue Type: Sub-task Reporter: radu Support distinct aggregates with rowtime order and time range boundaries Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6252) Distinct rowTime with Rows boundaries
radu created FLINK-6252: --- Summary: Distinct rowTime with Rows boundaries Key: FLINK-6252 URL: https://issues.apache.org/jira/browse/FLINK-6252 Project: Flink Issue Type: Sub-task Reporter: radu Support distinct aggregates over row time order with rows boundaries Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953644#comment-15953644 ] Chesnay Schepler commented on FLINK-3427: - [~vpernin] The backpressure monitoring should still be done through the REST API or the backpressure tab in the web-frontend. The "status" of the REST response is "deprecated" if this query caused the backpressure sampling to be started. Since this takes a bit the last, and as such outdated, backpressure information, will be returned. > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Webfrontend >Reporter: Robert Metzger > Fix For: 1.3.0 > > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6251) Distinct procTime with time range boundaries
[ https://issues.apache.org/jira/browse/FLINK-6251?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu reassigned FLINK-6251: --- Assignee: radu > Distinct procTime with time range boundaries > > > Key: FLINK-6251 > URL: https://issues.apache.org/jira/browse/FLINK-6251 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: radu > > Support proctime distinct aggregates with time boundaries > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE > BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6251) Distinct procTime with time range boundaries
radu created FLINK-6251: --- Summary: Distinct procTime with time range boundaries Key: FLINK-6251 URL: https://issues.apache.org/jira/browse/FLINK-6251 Project: Flink Issue Type: Sub-task Reporter: radu Support proctime distinct aggregates with time boundaries Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu reassigned FLINK-6250: --- Assignee: Stefano Bortoli Description: Support proctime with rows boundaries Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` was: Support proctime with rows boundaries Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-3031) Consistent Shutdown of Streaming Jobs
[ https://issues.apache.org/jira/browse/FLINK-3031?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ufuk Celebi closed FLINK-3031. -- Resolution: Invalid Yes > Consistent Shutdown of Streaming Jobs > - > > Key: FLINK-3031 > URL: https://issues.apache.org/jira/browse/FLINK-3031 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Matthias J. Sax > > Depends on FLINK-2111 > When a streaming job is shut down cleanly via "stop", a last consistent > snapshot should be collected. This snapshot could be used to resume a job > later on. > See mail archive for more details of the discussion: > https://mail-archives.apache.org/mod_mbox/flink-dev/201511.mbox/%3CCA%2Bfaj9xDFAUG_zi%3D%3DE2H8s-8R4cn8ZBDON_hf%2B1Rud5pJqvZ4A%40mail.gmail.com%3E -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6250) Distinct procTime with Rows boundaries
radu created FLINK-6250: --- Summary: Distinct procTime with Rows boundaries Key: FLINK-6250 URL: https://issues.apache.org/jira/browse/FLINK-6250 Project: Flink Issue Type: Sub-task Reporter: radu Support proctime with rows boundaries Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu updated FLINK-6249: Description: Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in windows and meant for the elements to be aggregated Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` General comments: - DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) - We can consider as a sub-JIRA issue the implementation of DISTINCT for UNBOUND sliding windows. However, there would be no control over the data structure to keep seen data (to check it is not re-process). -> This needs to be decided if we want to support it (to create appropriate JIRA issues) => We will open sub-JIRA issues to extend the current functionality of aggregates for the DISTINCT CASE (Q1.{1-4}). (This is the main target of this JIRA) => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. Other similar query support Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for the elements of the aggregate(s) `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() TO HOUR)` => We need to decide if we aim to support for this release distinct aggregates for the group by (Q2). If so sub-JIRA issues need to be created. We can follow the same design/implementation. => We can consider as a sub-JIRA issue the implementation of DISTINCT for select clauses. However, there is no control over the growing size of the data structure and it will unavoidably crash the memory. Q3. Distinct is applied to the collection of outputs to be selected. `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY FLOOR(procTime() TO DAY)` Description: The DISTINCT operator requires processing the elements to ensure uniqueness. Either that the operation is used for SELECT ALL distinct elements or for applying typical aggregation functions over a set of elements, there is a prior need of forming a collection of elements. This brings the need of using windows or grouping methods. Therefore the distinct function will be implemented within windows. Depending on the type of window definition there are several options: - Main Scope: If distinct is applied as in Q1 example for window aggregations than either we extend the implementation with distinct aggregates (less prefered) or extend the sliding window aggregates implementation in the processFunction with distinctinction identification support (prefered). The later option is prefered because a query can carry multiple aggregates including multiple aggregates that have the distinct key word set up. Implementing the distinction between elements in the process function avoid the need to multiply the data structure to mark what what was seen across multiple aggregates. It also makes the implementation more robust and resilient as we cn keep the data structure for marking the seen elements in a state (mapstate). - If distinct is applied as in Q2 example on group elements than either we define a new implementation if selection is general or extend the current implementation of grouped aggregates with distinct group aggregates - If distinct is applied as in Q3 example for the select all elements, then a new implementation needs to be defined. This would work over a specific window and within the window function the uniqueness of the results to be processed will be done. Functionality example - We exemplify below the functionality of the IN/Exists when working with streams. `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) ` `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) ` `Q3: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2||Q3|| ||10:00:01| (ab,1)| | | 1 | ||10:05:00| (aa,2)| | | 3 | ||11:00:00| | ab,aa | 2 | | ||11:03:00| (aa,2)| |
[jira] [Commented] (FLINK-5829) Bump Calcite version to 1.12 once available
[ https://issues.apache.org/jira/browse/FLINK-5829?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953634#comment-15953634 ] ASF GitHub Bot commented on FLINK-5829: --- Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3613 Thanks @haohui. I will merge this. > Bump Calcite version to 1.12 once available > --- > > Key: FLINK-5829 > URL: https://issues.apache.org/jira/browse/FLINK-5829 > Project: Flink > Issue Type: Task > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Haohui Mai > > Once Calcite 1.12 is release we should update to remove some copied classes. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3613: [FLINK-5829] Bump Calcite version to 1.12 once available.
Github user twalthr commented on the issue: https://github.com/apache/flink/pull/3613 Thanks @haohui. I will merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output
[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953621#comment-15953621 ] Dawid Wysakowicz commented on FLINK-6244: - I think it is a good way to proceed. Another way I was thinking is maintain two versions (with current one deprecated), which shouldn't be to much of overhead. > Emit timeouted Patterns as Side Output > -- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Dawid Wysakowicz > Fix For: 1.3.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3427) Add watermark monitoring to JobManager web frontend
[ https://issues.apache.org/jira/browse/FLINK-3427?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953612#comment-15953612 ] Vladislav Pernin commented on FLINK-3427: - What is the official way of monitoring backpressure ? The rest_api is not documented any more and sends a "deprecated" response. Shouldn't it be part of the metrics system ? > Add watermark monitoring to JobManager web frontend > --- > > Key: FLINK-3427 > URL: https://issues.apache.org/jira/browse/FLINK-3427 > Project: Flink > Issue Type: Improvement > Components: DataStream API, Webfrontend >Reporter: Robert Metzger > Fix For: 1.3.0 > > > Currently, its quite hard to figure out issues with the watermarks. > I think we can improve the situation by reporting the following information > through the metrics system: > - Report the current low watermark for each operator (this way, you can see > if one operator is preventing the watermarks to rise) > - Report the number of events arrived after the low watermark (users can see > how accurate the watermarks are) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6249) Distinct Aggregates for OVER window
radu created FLINK-6249: --- Summary: Distinct Aggregates for OVER window Key: FLINK-6249 URL: https://issues.apache.org/jira/browse/FLINK-6249 Project: Flink Issue Type: New Feature Components: Table API & SQL Affects Versions: 1.3.0 Reporter: radu Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in windows and meant for the elements to be aggregated Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` General comments: - DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) - We can consider as a sub-JIRA issue the implementation of DISTINCT for UNBOUND sliding windows. However, there would be no control over the data structure to keep seen data (to check it is not re-process). -> This needs to be decided if we want to support it (to create appropriate JIRA issues) => We will open sub-JIRA issues to extend the current functionality of aggregates for the DISTINCT CASE (Q1.{1-4}). (This is the main target of this JIRA) => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. Other similar query support Q2. Boundaries are expressed in GROUP BY clause and distinct is applied for the elements of the aggregate(s) `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() TO HOUR)` => We need to decide if we aim to support for this release distinct aggregates for the group by (Q2). If so sub-JIRA issues need to be created. We can follow the same design/implementation. => We can consider as a sub-JIRA issue the implementation of DISTINCT for select clauses. However, there is no control over the growing size of the data structure and it will unavoidably crash the memory. Q3. Distinct is applied to the collection of outputs to be selected. `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY FLOOR(procTime() TO DAY)` Description: The DISTINCT operator requires processing the elements to ensure uniqueness. Either that the operation is used for SELECT ALL distinct elements or for applying typical aggregation functions over a set of elements, there is a prior need of forming a collection of elements. This brings the need of using windows or grouping methods. Therefore the distinct function will be implemented within windows. Depending on the type of window definition there are several options: - Main Scope: If distinct is applied as in Q1 example for window aggregations than either we extend the implementation with distinct aggregates (less prefered) or extend the sliding window aggregates implementation in the processFunction with distinctinction identification support (prefered). The later option is prefered because a query can carry multiple aggregates including multiple aggregates that have the distinct key word set up. Implementing the distinction between elements in the process function avoid the need to multiply the data structure to mark what what was seen across multiple aggregates. It also makes the implementation more robust and resilient as we cn keep the data structure for marking the seen elements in a state (mapstate). - If distinct is applied as in Q2 example on group elements than either we define a new implementation if selection is general or extend the current implementation of grouped aggregates with distinct group aggregates - If distinct is applied as in Q3 example for the select all elements, then a new implementation needs to be defined. This would work over a specific window and within the window function the uniqueness of the results to be processed will be done. Functionality example - We exemplify below the functionality of the IN/Exists when working with streams. `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) ` `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) ` `Q3: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Proctime
[jira] [Created] (FLINK-6248) Make the optional() available to all offered patterns.
Kostas Kloudas created FLINK-6248: - Summary: Make the optional() available to all offered patterns. Key: FLINK-6248 URL: https://issues.apache.org/jira/browse/FLINK-6248 Project: Flink Issue Type: Improvement Components: CEP Affects Versions: 1.3.0 Reporter: Kostas Kloudas Fix For: 1.3.0 Currently the {{optional()}} quantifier is available as a separate pattern. This issue proposes to make it available as a flag to all patterns. This implies that: 1) a singleton pattern with {{optional=true}} will become the current {{OPTIONAL}}, 2) a {{oneToMany}} will become {{zeroToMany}}, 3) the {{zeroToMany}} will not exist as a direct option in the {{Pattern}} class, and 4) the {{times()}} will require some changes in the {{NFACompiler}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5994) Add Janino to flink-table JAR file
[ https://issues.apache.org/jira/browse/FLINK-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953588#comment-15953588 ] ASF GitHub Bot commented on FLINK-5994: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3656 Here's the JIRA for this PR: https://issues.apache.org/jira/browse/FLINK-6247 > Add Janino to flink-table JAR file > -- > > Key: FLINK-5994 > URL: https://issues.apache.org/jira/browse/FLINK-5994 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: sunjincheng > > It seems that Janino is not part of the flink-table JAR file although it is a > dependency in pom.xml. Users adding flink-table to Flink's lib folder because > of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6244) Emit timeouted Patterns as Side Output
[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953590#comment-15953590 ] Kostas Kloudas commented on FLINK-6244: --- The only problem I can find, and we have to think of how to handle, is backwards compatibility. The already existing {{CEP}} programs (if any) would expect the timed-out patterns in the old format ({{Either}}). I would suggest that the best way to see how to proceed is to open a discussion in the dev mailing list with the proposed change. This will allow us to see 1) if anybody uses the CEP library, and 2) if users that already have a running job are ok with the change. What do you think? > Emit timeouted Patterns as Side Output > -- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Dawid Wysakowicz > Fix For: 1.3.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3656: [FLINK-5994][table] Add Janino to flink-dist JAR file
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3656 Here's the JIRA for this PR: https://issues.apache.org/jira/browse/FLINK-6247 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6247) Build a jar-with-dependencies for flink-table and put it into ./opt
Fabian Hueske created FLINK-6247: Summary: Build a jar-with-dependencies for flink-table and put it into ./opt Key: FLINK-6247 URL: https://issues.apache.org/jira/browse/FLINK-6247 Project: Flink Issue Type: Improvement Components: Build System, Table API & SQL Affects Versions: 1.3.0 Reporter: Fabian Hueske Assignee: sunjincheng Due to a problem with Calcite and the unloading of classes, user-code classloaders that include Calcite cannot be garbage collected. This is a problem for long-running clusters that execute multiple Table API / SQL programs with fat JARs that include the flink-table dependency. Each executed program comes with an own user-code classloader that cannot be cleaned up later. As a workaround, we recommend to copy the flink-table dependency into the ./lib folder. However, we do not have a jar file with all required transitive dependencies (Calcite, Janino, etc). Hence, users would need to build this jar file themselves or copy all jars into ./lib. This issue is about creating a jar-with-dependencies and adding it to the ./opt folder. Users can then copy the jar file from ./opt to ./lib to include the table API in the classpath of Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6209) StreamPlanEnvironment always has a parallelism of 1
[ https://issues.apache.org/jira/browse/FLINK-6209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6209: Affects Version/s: 1.2.1 > StreamPlanEnvironment always has a parallelism of 1 > --- > > Key: FLINK-6209 > URL: https://issues.apache.org/jira/browse/FLINK-6209 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1 >Reporter: Haohui Mai >Assignee: Haohui Mai > > Thanks [~bill.liu8904] for triaging the issue. > After FLINK-5808 we saw that the Flink jobs that are uploaded through the UI > always have a parallelism of 1, even the parallelism is explicitly set via in > the UI. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6244) Emit timeouted Patterns as Side Output
[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-6244: -- Fix Version/s: 1.3.0 > Emit timeouted Patterns as Side Output > -- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Dawid Wysakowicz > Fix For: 1.3.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6244) Emit timeouted Patterns as Side Output
[ https://issues.apache.org/jira/browse/FLINK-6244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas updated FLINK-6244: -- Affects Version/s: 1.3.0 > Emit timeouted Patterns as Side Output > -- > > Key: FLINK-6244 > URL: https://issues.apache.org/jira/browse/FLINK-6244 > Project: Flink > Issue Type: Improvement > Components: CEP >Affects Versions: 1.3.0 >Reporter: Dawid Wysakowicz > Fix For: 1.3.0 > > > Now that we have SideOuputs I think timeouted patterns should be emitted into > them rather than producing a stream of `Either` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5994) Add Janino to flink-table JAR file
[ https://issues.apache.org/jira/browse/FLINK-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953578#comment-15953578 ] ASF GitHub Bot commented on FLINK-5994: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3656 I will create another JIRA for this issue and assign it to you. > Add Janino to flink-table JAR file > -- > > Key: FLINK-5994 > URL: https://issues.apache.org/jira/browse/FLINK-5994 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: sunjincheng > > It seems that Janino is not part of the flink-table JAR file although it is a > dependency in pom.xml. Users adding flink-table to Flink's lib folder because > of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3656: [FLINK-5994][table] Add Janino to flink-dist JAR file
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3656 I will create another JIRA for this issue and assign it to you. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5994) Add Janino to flink-table JAR file
[ https://issues.apache.org/jira/browse/FLINK-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953574#comment-15953574 ] ASF GitHub Bot commented on FLINK-5994: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3656 The jar-with-dependencies should include Janino. This jar needs to include everything that is required to run flink-table, but nothing that is already included in flink-dist.jar. If I understand correctly, FLINK-5994 is about adding Janino to the default jar artifact which is deployed to Maven. Actually, I'm not sure if that should be done because Maven would load that dependency automatically. > Add Janino to flink-table JAR file > -- > > Key: FLINK-5994 > URL: https://issues.apache.org/jira/browse/FLINK-5994 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: sunjincheng > > It seems that Janino is not part of the flink-table JAR file although it is a > dependency in pom.xml. Users adding flink-table to Flink's lib folder because > of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3656: [FLINK-5994][table] Add Janino to flink-dist JAR file
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3656 The jar-with-dependencies should include Janino. This jar needs to include everything that is required to run flink-table, but nothing that is already included in flink-dist.jar. If I understand correctly, FLINK-5994 is about adding Janino to the default jar artifact which is deployed to Maven. Actually, I'm not sure if that should be done because Maven would load that dependency automatically. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3871) Add Kafka TableSource with Avro serialization
[ https://issues.apache.org/jira/browse/FLINK-3871?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953573#comment-15953573 ] ASF GitHub Bot commented on FLINK-3871: --- GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3663 [FLINK-3871] [table] Add Kafka TableSource with Avro serialization This PR adds KafkaAvroTableSource. It serializes/deserializes (nested) Avro records to (nested) Flink rows. Avro Utf8 strings are converted to regular Java strings. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-3871 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3663.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3663 commit 589e45c5c50c328783f71d219c6606e972f42f34 Author: twalthrDate: 2017-04-03T12:44:46Z [FLINK-3871] [table] Add Kafka TableSource with Avro serialization > Add Kafka TableSource with Avro serialization > - > > Key: FLINK-3871 > URL: https://issues.apache.org/jira/browse/FLINK-3871 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Fabian Hueske >Assignee: Ivan Mushketyk > > Add a Kafka TableSource which supports Avro serialized data. > The KafkaAvroTableSource should support two modes: > # SpecificRecord Mode: In this case the user specifies a class which was > code-generated by Avro depending on a schema. Flink treats these classes as > regular POJOs. Hence, they are also natively supported by the Table API and > SQL. Classes generated by Avro contain their Schema in a static field. The > schema should be used to automatically derive field names and types. Hence, > there is no additional information required than the name of the class. > # GenericRecord Mode: In this case the user specifies an Avro Schema. The > schema is used to deserialize the data into a GenericRecord which must be > translated into possibly nested {{Row}} based on the schema information. > Again, the Avro Schema is used to automatically derive the field names and > types. This mode is less efficient than the SpecificRecord mode because the > {{GenericRecord}} needs to be converted into {{Row}}. > This feature depends on FLINK-5280, i.e., support for nested data in > {{TableSource}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3663: [FLINK-3871] [table] Add Kafka TableSource with Av...
GitHub user twalthr opened a pull request: https://github.com/apache/flink/pull/3663 [FLINK-3871] [table] Add Kafka TableSource with Avro serialization This PR adds KafkaAvroTableSource. It serializes/deserializes (nested) Avro records to (nested) Flink rows. Avro Utf8 strings are converted to regular Java strings. You can merge this pull request into a Git repository by running: $ git pull https://github.com/twalthr/flink FLINK-3871 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3663.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3663 commit 589e45c5c50c328783f71d219c6606e972f42f34 Author: twalthrDate: 2017-04-03T12:44:46Z [FLINK-3871] [table] Add Kafka TableSource with Avro serialization --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953559#comment-15953559 ] ASF GitHub Bot commented on FLINK-6246: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428611 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -619,20 +619,20 @@ public void collect(StreamRecord record) { } @Override - public void collect(OutputTag outputTag, StreamRecord record) { + public void collect(OutputTag outputTag, StreamRecord record) { for (int i = 0; i < outputs.length - 1; i++) { Outputoutput = outputs[i]; // due to side outputs, StreamRecords of varying types can pass through the broadcasting // collector so we need to cast @SuppressWarnings({"unchecked", "rawtypes"}) - StreamRecord shallowCopy = (StreamRecord) record.copy(record.getValue()); + StreamRecord shallowCopy = record.copy(record.getValue()); output.collect(outputTag, shallowCopy); } // don't copy for the last output @SuppressWarnings({"unchecked", "rawtypes"}) --- End diff -- Remove suppression. > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953560#comment-15953560 ] ASF GitHub Bot commented on FLINK-6246: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428703 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java --- @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception { } // execute the program - env.execute("Streaming Iteration Example"); + System.out.println(env.getExecutionPlan()); --- End diff -- I think this change should be reverted. > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953558#comment-15953558 ] ASF GitHub Bot commented on FLINK-6246: --- Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428555 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -619,20 +619,20 @@ public void collect(StreamRecord record) { } @Override - public void collect(OutputTag outputTag, StreamRecord record) { + public void collect(OutputTag outputTag, StreamRecord record) { for (int i = 0; i < outputs.length - 1; i++) { Outputoutput = outputs[i]; // due to side outputs, StreamRecords of varying types can pass through the broadcasting // collector so we need to cast @SuppressWarnings({"unchecked", "rawtypes"}) --- End diff -- Suppression not needed anymore. Also the comment is not adequate, as we do not cast anything. > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3662: [FLINK-6246] Fix generic type of OutputTag in oper...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428703 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java --- @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception { } // execute the program - env.execute("Streaming Iteration Example"); + System.out.println(env.getExecutionPlan()); --- End diff -- I think this change should be reverted. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3662: [FLINK-6246] Fix generic type of OutputTag in oper...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428555 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -619,20 +619,20 @@ public void collect(StreamRecord record) { } @Override - public void collect(OutputTag outputTag, StreamRecord record) { + public void collect(OutputTag outputTag, StreamRecord record) { for (int i = 0; i < outputs.length - 1; i++) { Outputoutput = outputs[i]; // due to side outputs, StreamRecords of varying types can pass through the broadcasting // collector so we need to cast @SuppressWarnings({"unchecked", "rawtypes"}) --- End diff -- Suppression not needed anymore. Also the comment is not adequate, as we do not cast anything. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3662: [FLINK-6246] Fix generic type of OutputTag in oper...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109428611 --- Diff: flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java --- @@ -619,20 +619,20 @@ public void collect(StreamRecord record) { } @Override - public void collect(OutputTag outputTag, StreamRecord record) { + public void collect(OutputTag outputTag, StreamRecord record) { for (int i = 0; i < outputs.length - 1; i++) { Outputoutput = outputs[i]; // due to side outputs, StreamRecords of varying types can pass through the broadcasting // collector so we need to cast @SuppressWarnings({"unchecked", "rawtypes"}) - StreamRecord shallowCopy = (StreamRecord) record.copy(record.getValue()); + StreamRecord shallowCopy = record.copy(record.getValue()); output.collect(outputTag, shallowCopy); } // don't copy for the last output @SuppressWarnings({"unchecked", "rawtypes"}) --- End diff -- Remove suppression. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953547#comment-15953547 ] ASF GitHub Bot commented on FLINK-6246: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3662 [FLINK-6246] Fix generic type of OutputTag in operator Output R: @dawidwys You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6246-fix-output-tag-param Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3662.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3662 commit 3cfc5b18c27312de0e3e95a56cd78b46f80cf928 Author: Aljoscha KrettekDate: 2017-04-03T14:10:14Z [FLINK-6246] Fix generic type of OutputTag in operator Output > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3662: [FLINK-6246] Fix generic type of OutputTag in oper...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3662 [FLINK-6246] Fix generic type of OutputTag in operator Output R: @dawidwys You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-6246-fix-output-tag-param Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3662.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3662 commit 3cfc5b18c27312de0e3e95a56cd78b46f80cf928 Author: Aljoscha KrettekDate: 2017-04-03T14:10:14Z [FLINK-6246] Fix generic type of OutputTag in operator Output --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5994) Add Janino to flink-table JAR file
[ https://issues.apache.org/jira/browse/FLINK-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953543#comment-15953543 ] ASF GitHub Bot commented on FLINK-5994: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3656 Hi @fhueske In the ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-2-Proper-Packaging-of-flink-table-with-SBT-td12096.html user want `org/codehaus/commons/compiler/CompileException` included in flink-talbe.xxx.jar. But In my site when we build the flink-table.xx.jar, we can see that the `janino` has excluded. So, I make those changes in this PR. Then user can add `jar-with-dependencies.jar` in application class path. Do you think `janino ` should included in flink-table.xxx.jar ? or Is there something wrong in my site? Thanks, SunJincheng > Add Janino to flink-table JAR file > -- > > Key: FLINK-5994 > URL: https://issues.apache.org/jira/browse/FLINK-5994 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: sunjincheng > > It seems that Janino is not part of the flink-table JAR file although it is a > dependency in pom.xml. Users adding flink-table to Flink's lib folder because > of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3656: [FLINK-5994][table] Add Janino to flink-dist JAR file
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3656 Hi @fhueske In the ML: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Flink-1-2-Proper-Packaging-of-flink-table-with-SBT-td12096.html user want `org/codehaus/commons/compiler/CompileException` included in flink-talbe.xxx.jar. But In my site when we build the flink-table.xx.jar, we can see that the `janino` has excluded. So, I make those changes in this PR. Then user can add `jar-with-dependencies.jar` in application class path. Do you think `janino ` should included in flink-table.xxx.jar ? or Is there something wrong in my site? Thanks, SunJincheng --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-6246) Fix generic type of OutputTag in operator Output
Aljoscha Krettek created FLINK-6246: --- Summary: Fix generic type of OutputTag in operator Output Key: FLINK-6246 URL: https://issues.apache.org/jira/browse/FLINK-6246 Project: Flink Issue Type: Bug Components: DataStream API Reporter: Aljoscha Krettek The current signature is {code} void collect(OutputTag outputTag, StreamRecord record) {code} which can be improved to {code} void collect(OutputTag outputTag, StreamRecord record) {code} This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953531#comment-15953531 ] ASF GitHub Bot commented on FLINK-6237: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109424349 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala --- @@ -168,6 +168,18 @@ class SqlExpressionTest extends ExpressionTestBase { testSqlApi("ELEMENT(ARRAY['HELLO WORLD'])", "HELLO WORLD") } + @Test + def testRand(): Unit = { +val random = new java.util.Random(1) +testSqlApi("RAND(1)", random.nextDouble().toString) + } + + @Test + def testRandInteger(): Unit = { +val random = new java.util.Random(1) +testSqlApi("RAND_INTEGER(1, 10)", random.nextInt(10).toString) --- End diff -- Check a second time. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953532#comment-15953532 ] ASF GitHub Bot commented on FLINK-6237: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109423941 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/CodeGenerator.scala --- @@ -1494,6 +1519,22 @@ class CodeGenerator( } /** +* Adds a reusable [[java.util.Random]] to the member area of the generated [[Function]]. +* +* @return member variable term +*/ + def addReusableRandom(): String = { +val fieldTerm = newName("random") + +val field = + s""" + |transient java.util.Random $fieldTerm = null; --- End diff -- `Random` implements `Serializable`. I think we initialize `Random` already here, remove `transient` and add `final`. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953534#comment-15953534 ] ASF GitHub Bot commented on FLINK-6237: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109424063 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( +randField: String, +seedExpr: GeneratedExpression, +resultType: TypeInformation[_]) + : GeneratedExpression = { +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) +val randCode = if (seedExpr != null) { + s""" + |if ($randField == null) { --- End diff -- If we initialize `Random` before, we do not need to this condition. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953533#comment-15953533 ] ASF GitHub Bot commented on FLINK-6237: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109424139 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( +randField: String, +seedExpr: GeneratedExpression, +resultType: TypeInformation[_]) + : GeneratedExpression = { +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) +val randCode = if (seedExpr != null) { + s""" + |if ($randField == null) { + | ${seedExpr.code} + | $randField = new java.util.Random(${seedExpr.resultTerm}); + |} + |$resultTypeTerm $resultTerm = $randField.nextDouble(); + """.stripMargin +} else { + s""" + |if ($randField == null) { + | $randField = new java.util.Random(); + |} + |$resultTypeTerm $resultTerm = $randField.nextDouble(); + """.stripMargin +} + +GeneratedExpression(resultTerm, GeneratedExpression.NEVER_NULL, randCode, resultType) + } + + def generateRandInteger( +randField: String, +seedExpr: GeneratedExpression, +boundExpr: GeneratedExpression, +resultType: TypeInformation[_]) + : GeneratedExpression = { +assert(boundExpr != null) +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) +val randCode = if (seedExpr != null) { + s""" + |if ($randField == null) { --- End diff -- If we initialize `Random` before, we do not need this condition. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6237) support RAND and RAND_INTEGER on SQL
[ https://issues.apache.org/jira/browse/FLINK-6237?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15953535#comment-15953535 ] ASF GitHub Bot commented on FLINK-6237: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109424315 --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/expressions/SqlExpressionTest.scala --- @@ -168,6 +168,18 @@ class SqlExpressionTest extends ExpressionTestBase { testSqlApi("ELEMENT(ARRAY['HELLO WORLD'])", "HELLO WORLD") } + @Test + def testRand(): Unit = { +val random = new java.util.Random(1) +testSqlApi("RAND(1)", random.nextDouble().toString) --- End diff -- Check a second time that the next random number is different. > support RAND and RAND_INTEGER on SQL > > > Key: FLINK-6237 > URL: https://issues.apache.org/jira/browse/FLINK-6237 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: godfrey he >Assignee: godfrey he > > support RAND and RAND_INTEGER with and without seed on SQL. > like: > RAND([seed]), > RAND_INTEGER([seed, ] bound) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3660: [FLINK-6237] [table] support RAND and RAND_INTEGER...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/3660#discussion_r109424063 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/codegen/calls/ScalarOperators.scala --- @@ -911,6 +911,64 @@ object ScalarOperators { } } + def generateRand( +randField: String, +seedExpr: GeneratedExpression, +resultType: TypeInformation[_]) + : GeneratedExpression = { +val resultTerm = newName("result") +val resultTypeTerm = primitiveTypeTermForTypeInfo(resultType) +val randCode = if (seedExpr != null) { + s""" + |if ($randField == null) { --- End diff -- If we initialize `Random` before, we do not need to this condition. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---