[GitHub] flink issue #3667: Fix a small spelling error har-with-dependencies -> jar-w...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3667 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3668: [FLINK-6254] [cep] Same method name for late data ...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3668 [FLINK-6254] [cep] Same method name for late data outputs on PatternStream and windowing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-late-consolidation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3668.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3668 commit f24b0eba9d82ea91f5d44e747e8a64e94cb71cd5 Author: kl0u Date: 2017-04-03T15:39:02Z [FLINK-6254] [cep] Same method name for late data outputs on PatternStream and WindowedStream --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954706#comment-15954706 ] ASF GitHub Bot commented on FLINK-6254: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3668 [FLINK-6254] [cep] Same method name for late data outputs on PatternStream and windowing. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink cep-late-consolidation Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3668.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3668 commit f24b0eba9d82ea91f5d44e747e8a64e94cb71cd5 Author: kl0u Date: 2017-04-03T15:39:02Z [FLINK-6254] [cep] Same method name for late data outputs on PatternStream and WindowedStream > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu updated FLINK-6249: Description: Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in windows and meant for the elements to be aggregated Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` General comments: - DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) - We can consider as a sub-JIRA issue the implementation of DISTINCT for UNBOUND sliding windows. However, there would be no control over the data structure to keep seen data (to check it is not re-process). -> This needs to be decided if we want to support it (to create appropriate JIRA issues) => We will open sub-JIRA issues to extend the current functionality of aggregates for the DISTINCT CASE (Q1.{1-4}). (This is the main target of this JIRA) => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. Description: The DISTINCT operator requires processing the elements to ensure uniqueness. Either that the operation is used for SELECT ALL distinct elements or for applying typical aggregation functions over a set of elements, there is a prior need of forming a collection of elements. This brings the need of using windows or grouping methods. Therefore the distinct function will be implemented within windows. Depending on the type of window definition there are several options: - Main Scope: If distinct is applied as in Q1 example for window aggregations than either we extend the implementation with distinct aggregates (less preferred) or extend the sliding window aggregates implementation in the processFunction with distinction identification support (preferred). The later option is preferred because a query can carry multiple aggregates including multiple aggregates that have the distinct key word set up. Implementing the distinction between elements in the process function avoid the need to multiply the data structure to mark what what was seen across multiple aggregates. It also makes the implementation more robust and resilient as we can keep the data structure for marking the seen elements in a state (mapstate). Functionality example - We exemplify below the functionality of the IN/Exists when working with streams. `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` ||Proctime||IngestionTime(Event)||Stream1||Q3|| ||10:00:01| (ab,1)| 1 | ||10:05:00| (aa,2)|3 | ||11:03:00| (aa,2)| 3 | ||11:09:00| (aa,2 |2 | |...| Implementation option - Considering that the behavior depends on over window behavior, the implementation will be done by reusing the existing implementation of the over window functions - done based on processFunction. As mentioned in the description section, there are 2 options to consider: 1) Using distinct within the aggregates implementation by extending with distinct aggregates implementation the current aggregates in Flink. For this we define additional JIRA issues for each implementation to support the distinct keyword. 2) Using distinct for selection within the process logic when calling the aggregates. This requires a new implementation of the process Function used for computing the aggregates. The processFunction will also carry the logic of taking each element once. For this 2 options are possible. Option 1 (To be used within the ProcessFunction) trades memory – and would require to create a hashmap (e.g. mapstate) with binary values to mark if the event was saw before. This will be created once per window and will be reused across multiple distinct aggregates. Option 2 trades computation and would require to sort the window contents and in case of identical elements to eliminate them. The sorting can be done based on hash values in case the events are non numeric or composite or do not possess an id to mark the uniqueness. Option 2 is not preferred for incremental aggregates and should be consider only
[jira] [Updated] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu updated FLINK-6249: Description: Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in windows and meant for the elements to be aggregated Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` General comments: - DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) - We can consider as a sub-JIRA issue the implementation of DISTINCT for UNBOUND sliding windows. However, there would be no control over the data structure to keep seen data (to check it is not re-process). -> This needs to be decided if we want to support it (to create appropriate JIRA issues) => We will open sub-JIRA issues to extend the current functionality of aggregates for the DISTINCT CASE => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. Description: The DISTINCT operator requires processing the elements to ensure uniqueness. Either that the operation is used for SELECT ALL distinct elements or for applying typical aggregation functions over a set of elements, there is a prior need of forming a collection of elements. This brings the need of using windows or grouping methods. Therefore the distinct function will be implemented within windows. Depending on the type of window definition there are several options: - Main Scope: If distinct is applied as in Q1 example for window aggregations than either we extend the implementation with distinct aggregates (less preferred) or extend the sliding window aggregates implementation in the processFunction with distinction identification support (preferred). The later option is preferred because a query can carry multiple aggregates including multiple aggregates that have the distinct key word set up. Implementing the distinction between elements in the process function avoid the need to multiply the data structure to mark what what was seen across multiple aggregates. It also makes the implementation more robust and resilient as we can keep the data structure for marking the seen elements in a state (mapstate). Functionality example - We exemplify below the functionality of the IN/Exists when working with streams. `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` ||Proctime||IngestionTime(Event)||Stream1||Q3|| ||10:00:01| (ab,1)| 1 | ||10:05:00| (aa,2)|3 | ||11:03:00| (aa,2)| 3 | ||11:09:00| (aa,2 |2 | |...| Implementation option - Considering that the behavior depends on over window behavior, the implementation will be done by reusing the existing implementation of the over window functions - done based on processFunction. As mentioned in the description section, there are 2 options to consider: 1) Using distinct within the aggregates implementation by extending with distinct aggregates implementation the current aggregates in Flink. For this we define additional JIRA issues for each implementation to support the distinct keyword. 2) Using distinct for selection within the process logic when calling the aggregates. This requires a new implementation of the process Function used for computing the aggregates. The processFunction will also carry the logic of taking each element once. For this 2 options are possible. Option 1 (To be used within the ProcessFunction) trades memory – and would require to create a hashmap (e.g. mapstate) with binary values to mark if the event was saw before. This will be created once per window and will be reused across multiple distinct aggregates. Option 2 trades computation and would require to sort the window contents and in case of identical elements to eliminate them. The sorting can be done based on hash values in case the events are non numeric or composite or do not possess an id to mark the uniqueness. Option 2 is not preferred for incremental aggregates and should be consider only if certain aggregates would require a window imp
[jira] [Updated] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu updated FLINK-6249: Description: Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in windows and meant for the elements to be aggregated Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` General comments: - DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) - We can consider as a sub-JIRA issue the implementation of DISTINCT for UNBOUND sliding windows. However, there would be no control over the data structure to keep seen data (to check it is not re-process). -> This needs to be decided if we want to support it (to create appropriate JIRA issues) => We will open sub-JIRA issues to extend the current functionality of aggregates for the DISTINCT CASE (Q1.{1-4}). => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. Description: The DISTINCT operator requires processing the elements to ensure uniqueness. Either that the operation is used for SELECT ALL distinct elements or for applying typical aggregation functions over a set of elements, there is a prior need of forming a collection of elements. This brings the need of using windows or grouping methods. Therefore the distinct function will be implemented within windows. Depending on the type of window definition there are several options: - Main Scope: If distinct is applied as in Q1 example for window aggregations than either we extend the implementation with distinct aggregates (less preferred) or extend the sliding window aggregates implementation in the processFunction with distinction identification support (preferred). The later option is preferred because a query can carry multiple aggregates including multiple aggregates that have the distinct key word set up. Implementing the distinction between elements in the process function avoid the need to multiply the data structure to mark what what was seen across multiple aggregates. It also makes the implementation more robust and resilient as we can keep the data structure for marking the seen elements in a state (mapstate). Functionality example - We exemplify below the functionality of the IN/Exists when working with streams. `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` ||Proctime||IngestionTime(Event)||Stream1||Q3|| ||10:00:01| (ab,1)| 1 | ||10:05:00| (aa,2)|3 | ||11:03:00| (aa,2)| 3 | ||11:09:00| (aa,2 |2 | |...| Implementation option - Considering that the behavior depends on over window behavior, the implementation will be done by reusing the existing implementation of the over window functions - done based on processFunction. As mentioned in the description section, there are 2 options to consider: 1) Using distinct within the aggregates implementation by extending with distinct aggregates implementation the current aggregates in Flink. For this we define additional JIRA issues for each implementation to support the distinct keyword. 2) Using distinct for selection within the process logic when calling the aggregates. This requires a new implementation of the process Function used for computing the aggregates. The processFunction will also carry the logic of taking each element once. For this 2 options are possible. Option 1 (To be used within the ProcessFunction) trades memory – and would require to create a hashmap (e.g. mapstate) with binary values to mark if the event was saw before. This will be created once per window and will be reused across multiple distinct aggregates. Option 2 trades computation and would require to sort the window contents and in case of identical elements to eliminate them. The sorting can be done based on hash values in case the events are non numeric or composite or do not possess an id to mark the uniqueness. Option 2 is not preferred for incremental aggregates and should be consider only if certain aggregates would require a
[jira] [Updated] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] radu updated FLINK-6249: Description: Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in windows and meant for the elements to be aggregated Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` General comments: - DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) - We can consider as a sub-JIRA issue the implementation of DISTINCT for UNBOUND sliding windows. However, there would be no control over the data structure to keep seen data (to check it is not re-process). -> This needs to be decided if we want to support it (to create appropriate JIRA issues) => We will open sub-JIRA issues to extend the current functionality of aggregates for the DISTINCT CASE => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. Description: The DISTINCT operator requires processing the elements to ensure uniqueness. Either that the operation is used for SELECT ALL distinct elements or for applying typical aggregation functions over a set of elements, there is a prior need of forming a collection of elements. This brings the need of using windows or grouping methods. Therefore the distinct function will be implemented within windows. Depending on the type of window definition there are several options: - Main Scope: If distinct is applied as in Q1 example for window aggregations than either we extend the implementation with distinct aggregates (less preferred) or extend the sliding window aggregates implementation in the processFunction with distinction identification support (preferred). The later option is preferred because a query can carry multiple aggregates including multiple aggregates that have the distinct key word set up. Implementing the distinction between elements in the process function avoid the need to multiply the data structure to mark what what was seen across multiple aggregates. It also makes the implementation more robust and resilient as we can keep the data structure for marking the seen elements in a state (mapstate). Functionality example - We exemplify below the functionality of the IN/Exists when working with streams. `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` ||Proctime||IngestionTime(Event)||Stream1||Q3|| ||10:00:01| (ab,1)| 1 | ||10:05:00| (aa,2)|3 | ||11:03:00| (aa,2)| 3 | ||11:09:00| (aa,2 |2 | |...| Implementation option - Considering that the behavior depends on over window behavior, the implementation will be done by reusing the existing implementation of the over window functions - done based on processFunction. As mentioned in the description section, there are 2 options to consider: 1) Using distinct within the aggregates implementation by extending with distinct aggregates implementation the current aggregates in Flink. For this we define additional JIRA issues for each implementation to support the distinct keyword. 2) Using distinct for selection within the process logic when calling the aggregates. This requires a new implementation of the process Function used for computing the aggregates. The processFunction will also carry the logic of taking each element once. For this 2 options are possible. Option 1 (To be used within the ProcessFunction) trades memory – and would require to create a hashmap (e.g. mapstate) with binary values to mark if the event was saw before. This will be created once per window and will be reused across multiple distinct aggregates. Option 2 trades computation and would require to sort the window contents and in case of identical elements to eliminate them. The sorting can be done based on hash values in case the events are non numeric or composite or do not possess an id to mark the uniqueness. Option 2 is not preferred for incremental aggregates and should be consider only if certain aggregates would require a window implementation tha
[jira] [Commented] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954717#comment-15954717 ] radu commented on FLINK-6249: - [~fhueske] Thanks for the feedback! I cleaned the JIRA as you suggested. I will move the group by cases in a separate JIRA. We can discuss that after we finish this (that is if someone does not want to take it meanwhile) > Distinct Aggregates for OVER window > --- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: radu > Labels: features, patch > > Time target: ProcTime/EventTime > SQL targeted query examples: > > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or some > bounded defined structures. Otherwise the operation would keep an infinite > amount of data to ensure uniqueness and would not trigger for certain > functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT for > UNBOUND sliding windows. However, there would be no control over the data > structure to keep seen data (to check it is not re-process). -> This needs to > be decided if we want to support it (to create appropriate JIRA issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE > => Aggregations over distinct elements without any boundary (i.e. within > SELECT clause) do not make sense just as aggregations do not make sense > without groupings or windows. > Description: > > The DISTINCT operator requires processing the elements to ensure uniqueness. > Either that the operation is used for SELECT ALL distinct elements or for > applying typical aggregation functions over a set of elements, there is a > prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the type > of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less preferred) or extend the sliding window aggregates > implementation in the processFunction with distinction identification support > (preferred). The later option is preferred because a query can carry multiple > aggregates including multiple aggregates that have the distinct key word set > up. Implementing the distinction between elements in the process function > avoid the need to multiply the data structure to mark what what was seen > across multiple aggregates. It also makes the implementation more robust and > resilient as we can keep the data structure for marking the seen elements in > a state (mapstate). > Functionality example > - > We exemplify below the functionality of the IN/Exists when working with > streams. > `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > ||Proctime||IngestionTime(Event)||Stream1||Q3|| > ||10:00:01| (ab,1)| 1 | > ||10:05:00| (aa,2)|3 | > ||11:03:00| (aa,2)| 3 | > ||11:09:00| (aa,2 |2 | > |...| > Implementation option > - > Considering that the behavior depends on over window behavior, the > implementation will be done by reusing the existing implementation of the > over window functions - done based on processFunction. As mentioned in the > description section, there are 2 options to consider: > 1) Using distinct within the aggregates implementation by extending with > distinct aggregates implementation the current aggregates in Flink. For this > we define additional JIRA issues for each implementation to support the > distinct keyword. > 2) Using distinct for selection within the process logic when calling the > aggregates. This requires a new implementation of the process Function used > for computing the aggregates. The processFunction will also carry the lo
[jira] [Comment Edited] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15953646#comment-15953646 ] radu edited comment on FLINK-6249 at 4/4/17 7:48 AM: - [~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] [~stefano.bortoli] I have opened a JIRA design to extend the current window aggregates implementation to support DISTINCT. I have created so far subtasks for this for the boundaed over windows. Depending on your opinion we can potentially open other subtasks for the group by aggregates as well as for the unbounded over windows was (Author: rtudoran): [~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] [~stefano.bortoli] I have opened a JIRA design to extend the current window aggregates implementation to support DISTINCT. I have created so far subtasks for this for the boundaed over windows. Depending on your opinion we can potentially open other subtasks for the group by aggregates as well as for the unbounded over windows > Distinct Aggregates for OVER window > --- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: radu > Labels: features, patch > > Time target: ProcTime/EventTime > SQL targeted query examples: > > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or some > bounded defined structures. Otherwise the operation would keep an infinite > amount of data to ensure uniqueness and would not trigger for certain > functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT for > UNBOUND sliding windows. However, there would be no control over the data > structure to keep seen data (to check it is not re-process). -> This needs to > be decided if we want to support it (to create appropriate JIRA issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE > => Aggregations over distinct elements without any boundary (i.e. within > SELECT clause) do not make sense just as aggregations do not make sense > without groupings or windows. > Description: > > The DISTINCT operator requires processing the elements to ensure uniqueness. > Either that the operation is used for SELECT ALL distinct elements or for > applying typical aggregation functions over a set of elements, there is a > prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the type > of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less preferred) or extend the sliding window aggregates > implementation in the processFunction with distinction identification support > (preferred). The later option is preferred because a query can carry multiple > aggregates including multiple aggregates that have the distinct key word set > up. Implementing the distinction between elements in the process function > avoid the need to multiply the data structure to mark what what was seen > across multiple aggregates. It also makes the implementation more robust and > resilient as we can keep the data structure for marking the seen elements in > a state (mapstate). > Functionality example > - > We exemplify below the functionality of the IN/Exists when working with > streams. > `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > ||Proctime||IngestionTime(Event)||Stream1||Q3|| > ||10:00:01| (ab,1)| 1 | > ||10:05:00| (aa,2)|3 | > ||11:03:00| (aa,2)| 3 | > ||11:09:00| (aa,2 |2 | > |...| > Implementation option > - > Considering that the behavior depends on over window behavior, the > implementation will be done by reusing
[jira] [Created] (FLINK-6260) Distinct Aggregates for Group By Windows
radu created FLINK-6260: --- Summary: Distinct Aggregates for Group By Windows Key: FLINK-6260 URL: https://issues.apache.org/jira/browse/FLINK-6260 Project: Flink Issue Type: New Feature Components: Table API & SQL Reporter: radu Time target: ProcTime/EventTime SQL targeted query examples: Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for the elements of the aggregate(s) `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() TO HOUR)` Q2. Distinct is applied to the collection of outputs to be selected. `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY FLOOR(procTime() TO DAY)` => DISTINCT operation makes sense only within the context of windows or some bounded defined structures. Otherwise the operation would keep an infinite amount of data to ensure uniqueness and would not trigger for certain functions (e.g. aggregates) => We can follow the same design/implementation as for JIRA FLINK-6249 (supporting Distinct Aggregates for OVER Windows) => We can consider as a sub-JIRA issue the implementation of DISTINCT for select clauses. => Aggregations over distinct elements without any boundary (i.e. within SELECT clause) do not make sense just as aggregations do not make sense without groupings or windows. If distinct is applied as in Q1 example on group elements than either we define a new implementation if selection is general or extend the current implementation of grouped aggregates with distinct group aggregates If distinct is applied as in Q2 example for the select all elements, then a new implementation needs to be defined. This would work over a specific window / processFunction and within the processing function the uniqueness of the results to be processed will be done. This will happen for each partition. The data structure used to trace distinct elements will be reset with each new window (i.e., group by scope) Examples `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) ` `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO HOUR) ` ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2|| ||10:00:01| (ab,1)| | | ||10:05:00| (aa,2)| | | ||11:00:00| | ab,aa | 2 | ||11:03:00| (aa,2)| | | ||11:09:00| (aa,2 | | | ||12:00:00| | aa| 1 | |...| Implementation option - Considering that the behavior is similar as the one implemented for over window behavior (with the difference that the distinction is reset for each , group scope), the implementation will be done by reusing the existing implementation of the over window functions. Distinction can be achieved within the aggregate itself or within the window/processfunction logic that computes the aggregates. As multiple aggregates which require distinction can be computed in the same time, the preferred option is to create distinction within the process logic. For the case of selecting distinct outputs (i.e., not aggregates) we can follow the same implementation design: support distinction in the aggregation and than emitting only one output per each element saw (instead of calling aggregate method of the aggregates) . -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6260) Distinct Aggregates for Group By Windows
[ https://issues.apache.org/jira/browse/FLINK-6260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954739#comment-15954739 ] radu commented on FLINK-6260: - [~fhueske] [~shijinkui] [~Yuhong_kyo] [~sunjincheng121] [~twalthr] [~stefano.bortoli] As per [~fhueske] suggestion in FLINK-6249 I have created a separate JIRA for the case of supporting distinct over group by. > Distinct Aggregates for Group By Windows > > > Key: FLINK-6260 > URL: https://issues.apache.org/jira/browse/FLINK-6260 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: radu > Labels: features > > Time target: ProcTime/EventTime > SQL targeted query examples: > > Q1. Boundaries are expressed in GROUP BY clause and distinct is applied for > the elements of the aggregate(s) > `SELECT MIN( DISTINCT rowtime), prodID FROM stream1 GROUP BY FLOOR(procTime() > TO HOUR)` > Q2. Distinct is applied to the collection of outputs to be selected. > `SELECT STREAM DISTINCT procTime(), prodId FROM stream1 GROUP BY > FLOOR(procTime() TO DAY)` > => DISTINCT operation makes sense only within the context of windows or some > bounded defined structures. Otherwise the operation would keep an infinite > amount of data to ensure uniqueness and would not trigger for certain > functions (e.g. aggregates) > => We can follow the same design/implementation as for JIRA FLINK-6249 > (supporting Distinct Aggregates for OVER Windows) > => We can consider as a sub-JIRA issue the implementation of DISTINCT for > select clauses. > => Aggregations over distinct elements without any boundary (i.e. > within SELECT clause) do not make sense just as aggregations do not make > sense without groupings or windows. > If distinct is applied as in Q1 example on group elements than either we > define a new implementation if selection is general or extend the current > implementation of grouped aggregates with distinct group aggregates > If distinct is applied as in Q2 example for the select all elements, then a > new implementation needs to be defined. This would work over a specific > window / processFunction and within the processing function the uniqueness of > the results to be processed will be done. This will happen for each > partition. The data structure used to trace distinct elements will be reset > with each new window (i.e., group by scope) > > Examples > > `Q1: SELECT STREAM DISTINCT b FROM stream1 GROUP BY FLOOR(PROCTIME TO HOUR) ` > `Q2: SELECT COUNT(DISTINCT b) FROM stream1 GROUP BY FLOOR(PROCTIME() TO > HOUR) ` > ||Proctime||IngestionTime(Event)||Stream1||Q1||Q2|| > ||10:00:01| (ab,1)| | | > ||10:05:00| (aa,2)| | | > ||11:00:00| | ab,aa | 2 | > ||11:03:00| (aa,2)| | | > ||11:09:00| (aa,2 | | | > ||12:00:00| | aa| 1 | > |...| > Implementation option > - > Considering that the behavior is similar as the one implemented for over > window behavior (with the difference that the distinction is reset for each , > group scope), the implementation will be done by reusing the existing > implementation of the over window functions. Distinction can be achieved > within the aggregate itself or within the window/processfunction logic that > computes the aggregates. As multiple aggregates which require distinction can > be computed in the same time, the preferred option is to create distinction > within the process logic. For the case of selecting distinct outputs (i.e., > not aggregates) we can follow the same implementation design: support > distinction in the aggregation and than emitting only one output per each > element saw (instead of calling aggregate method of the aggregates) . -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6241) codeGen dataStream aggregates that use ProcessFunction
[ https://issues.apache.org/jira/browse/FLINK-6241?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954746#comment-15954746 ] radu commented on FLINK-6241: - [~shaoxuan] i realized i did not notify you :) > codeGen dataStream aggregates that use ProcessFunction > -- > > Key: FLINK-6241 > URL: https://issues.apache.org/jira/browse/FLINK-6241 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Shaoxuan Wang >Assignee: Shaoxuan Wang > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window in strea...
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3665 Thanks for the PR @haohui! +1 to merge. I've been waiting for this feature :-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954799#comment-15954799 ] ASF GitHub Bot commented on FLINK-6011: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/3665 Thanks for the PR @haohui! +1 to merge. I've been waiting for this feature :-) > Support TUMBLE, HOP, SESSION window in streaming SQL > > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window in strea...
Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3665 So we finally got those supported by Calcite 1.12ï¼Really excited to see those features supported in flinkSQL. Thanks @haohui. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Updated] (FLINK-6188) Some setParallelism() methods can't cope with default parallelism
[ https://issues.apache.org/jira/browse/FLINK-6188?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6188: Fix Version/s: (was: 1.2.1) 1.3.0 1.2.2 > Some setParallelism() methods can't cope with default parallelism > - > > Key: FLINK-6188 > URL: https://issues.apache.org/jira/browse/FLINK-6188 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.2 > > > Recent changes done for FLINK-5808 move default parallelism manifestation > from eager to lazy, that is, the parallelism of operations that don't have an > explicit parallelism is only set when generating the JobGraph. Some > {{setParallelism()}} calls, such as > {{SingleOutputStreamOperator.setParallelism()}} cannot deal with the fact > that the parallelism of an operation might be {{-1}} (which indicates that it > should take the default parallelism when generating the JobGraph). > We should either revert the changes that fixed another user-facing bug for > version 1.2.1 or fix the methods. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954840#comment-15954840 ] ASF GitHub Bot commented on FLINK-6011: --- Github user shaoxuan-wang commented on the issue: https://github.com/apache/flink/pull/3665 So we finally got those supported by Calcite 1.12?Really excited to see those features supported in flinkSQL. Thanks @haohui. > Support TUMBLE, HOP, SESSION window in streaming SQL > > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6209) StreamPlanEnvironment always has a parallelism of 1
[ https://issues.apache.org/jira/browse/FLINK-6209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6209: Fix Version/s: 1.2.2 > StreamPlanEnvironment always has a parallelism of 1 > --- > > Key: FLINK-6209 > URL: https://issues.apache.org/jira/browse/FLINK-6209 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1 >Reporter: Haohui Mai >Assignee: Haohui Mai > Fix For: 1.3.0, 1.2.2 > > > Thanks [~bill.liu8904] for triaging the issue. > After FLINK-5808 we saw that the Flink jobs that are uploaded through the UI > always have a parallelism of 1, even the parallelism is explicitly set via in > the UI. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6209) StreamPlanEnvironment always has a parallelism of 1
[ https://issues.apache.org/jira/browse/FLINK-6209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6209: Fix Version/s: 1.3.0 > StreamPlanEnvironment always has a parallelism of 1 > --- > > Key: FLINK-6209 > URL: https://issues.apache.org/jira/browse/FLINK-6209 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.1 >Reporter: Haohui Mai >Assignee: Haohui Mai > Fix For: 1.3.0, 1.2.2 > > > Thanks [~bill.liu8904] for triaging the issue. > After FLINK-5808 we saw that the Flink jobs that are uploaded through the UI > always have a parallelism of 1, even the parallelism is explicitly set via in > the UI. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3662: [FLINK-6246] Fix generic type of OutputTag in oper...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109608264 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java --- @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception { } // execute the program - env.execute("Streaming Iteration Example"); + System.out.println(env.getExecutionPlan()); --- End diff -- Oh no, I was playing around with something. ð± --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954849#comment-15954849 ] ASF GitHub Bot commented on FLINK-6246: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3662#discussion_r109608264 --- Diff: flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/iteration/IterateExample.java --- @@ -107,7 +107,8 @@ public static void main(String[] args) throws Exception { } // execute the program - env.execute("Streaming Iteration Example"); + System.out.println(env.getExecutionPlan()); --- End diff -- Oh no, I was playing around with something. 😱 > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3662: [FLINK-6246] Fix generic type of OutputTag in operator Ou...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3662 @dawidwys thanks for reviewing so quickly. I pushed a commit to address your comments. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954851#comment-15954851 ] ASF GitHub Bot commented on FLINK-6246: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3662 @dawidwys thanks for reviewing so quickly. I pushed a commit to address your comments. > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3508: [FLINK-5991] [state-backend, streaming] Expose Bro...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109610444 --- Diff: docs/dev/stream/state.md --- @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream ## Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed` interface. -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -# ListCheckpointed + CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -# CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of --- End diff -- Could use "Round-Robin redistribution". Maybe... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954859#comment-15954859 ] ASF GitHub Bot commented on FLINK-5991: --- Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/3508#discussion_r109610444 --- Diff: docs/dev/stream/state.md --- @@ -233,45 +229,44 @@ val counts: DataStream[(String, Int)] = stream ## Using Managed Operator State -A stateful function can implement either the more general `CheckpointedFunction` +To use managed operator state, a stateful function can implement either the more general `CheckpointedFunction` interface, or the `ListCheckpointed` interface. -In both cases, the non-keyed state is expected to be a `List` of *serializable* objects, independent from each other, -thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which -non-keyed state can be repartitioned. As an example, if with parallelism 1 the checkpointed state of the `BufferingSink` -contains elements `(test1, 2)` and `(test2, 2)`, when increasing the parallelism to 2, `(test1, 2)` may end up in task 0, -while `(test2, 2)` will go to task 1. - -# ListCheckpointed + CheckpointedFunction -The `ListCheckpointed` interface requires the implementation of two methods: - -{% highlight java %} -List snapshotState(long checkpointId, long timestamp) throws Exception; - -void restoreState(List state) throws Exception; -{% endhighlight %} - -On `snapshotState()` the operator should return a list of objects to checkpoint and -`restoreState` has to handle such a list upon recovery. If the state is not re-partitionable, you can always -return a `Collections.singletonList(MY_STATE)` in the `snapshotState()`. - -# CheckpointedFunction - -The `CheckpointedFunction` interface also requires the implementation of two methods: +The `CheckpointedFunction` interface provides access to non-keyed state with different +redistribution schemes. It requires the implementation of two methods: {% highlight java %} void snapshotState(FunctionSnapshotContext context) throws Exception; void initializeState(FunctionInitializationContext context) throws Exception; {% endhighlight %} -Whenever a checkpoint has to be performed `snapshotState()` is called. The counterpart, `initializeState()`, is called every time the user-defined function is initialized, be that when the function is first initialized -or be that when actually recovering from an earlier checkpoint. Given this, `initializeState()` is not +Whenever a checkpoint has to be performed, `snapshotState()` is called. The counterpart, `initializeState()`, +is called every time the user-defined function is initialized, be that when the function is first initialized +or be that when the function is actually recovering from an earlier checkpoint. Given this, `initializeState()` is not only the place where different types of state are initialized, but also where state recovery logic is included. -This is an example of a function that uses `CheckpointedFunction`, a stateful `SinkFunction` that -uses state to buffer elements before sending them to the outside world: +Currently, list-style managed operator state is supported. The state +is expected to be a `List` of *serializable* objects, independent from each other, +thus eligible for redistribution upon rescaling. In other words, these objects are the finest granularity at which +non-keyed state can be redistributed. Depending on the state accessing method, +the following redistribution schemes are defined: + + - **Even-split redistribution:** Each operator returns a List of state elements. The whole state is logically a concatenation of --- End diff -- Could use "Round-Robin redistribution". Maybe... > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 Nice update to the docs! And yeah, a separate issue for deprecation is very good. ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954860#comment-15954860 ] ASF GitHub Bot commented on FLINK-5991: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3508 Nice update to the docs! And yeah, a separate issue for deprecation is very good. 👍 > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3662: [FLINK-6246] Fix generic type of OutputTag in operator Ou...
Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3662 Now, I think it is good to merge ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3661: [FLINK-4953] Allow access to "time" in ProcessWindowFunct...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 I'd like to keep it for now, since it might help catch some bugs and doesn't take long to run. What was the bug in the comparator? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6246) Fix generic type of OutputTag in operator Output
[ https://issues.apache.org/jira/browse/FLINK-6246?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954865#comment-15954865 ] ASF GitHub Bot commented on FLINK-6246: --- Github user dawidwys commented on the issue: https://github.com/apache/flink/pull/3662 Now, I think it is good to merge ;) > Fix generic type of OutputTag in operator Output > > > Key: FLINK-6246 > URL: https://issues.apache.org/jira/browse/FLINK-6246 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Aljoscha Krettek > > The current signature is > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > which can be improved to > {code} > void collect(OutputTag outputTag, StreamRecord record) > {code} > This is probably leftover from an intermediate stage of development. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4953) Allow access to "time" in ProcessWindowFunction.Context
[ https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954866#comment-15954866 ] ASF GitHub Bot commented on FLINK-4953: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3661 I'd like to keep it for now, since it might help catch some bugs and doesn't take long to run. What was the bug in the comparator? > Allow access to "time" in ProcessWindowFunction.Context > --- > > Key: FLINK-4953 > URL: https://issues.apache.org/jira/browse/FLINK-4953 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Manu Zhang >Assignee: Manu Zhang > > The recently added {{ProcessWindowFunction}} has a {{Context}} object that > allows querying some additional information about the window firing that we > are processing. Right now, this is only the window for which the firing is > happening. We should extends this with methods that allow querying the > current processing time and the current watermark. > Original text by issue creator: This is similar to FLINK-3674 but exposing > time information in window functions. Currently when a timer is fired, all > states in a window will be returned to users, including those after the > timer. This change will allow users to filter out states after the timer > based on time info. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3508: [FLINK-5991] [state-backend, streaming] Expose Broadcast ...
Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 Very good work and thanks for improving the documentation. I like the update. From what I have seen in the past, some user have mistaken the list-nature of the operator state and simply dumped lots of small elements in the list, that should not actually be the unit of repartitioning and sometimes even logically belonged together. I wonder if the different semantics in list state between the operator state and the keyed state can be confusing and error-prone for users and what we could do about this? A method called `getListState` might be a step in the wrong direction. Besides this, +1 from me. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954881#comment-15954881 ] ASF GitHub Bot commented on FLINK-5991: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/3508 Very good work and thanks for improving the documentation. I like the update. From what I have seen in the past, some user have mistaken the list-nature of the operator state and simply dumped lots of small elements in the list, that should not actually be the unit of repartitioning and sometimes even logically belonged together. I wonder if the different semantics in list state between the operator state and the keyed state can be confusing and error-prone for users and what we could do about this? A method called `getListState` might be a step in the wrong direction. Besides this, +1 from me. > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Assigned] (FLINK-6261) Add support for TUMBLE, HOP, SESSION to batch SQL
[ https://issues.apache.org/jira/browse/FLINK-6261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Fabian Hueske reassigned FLINK-6261: Assignee: Fabian Hueske > Add support for TUMBLE, HOP, SESSION to batch SQL > - > > Key: FLINK-6261 > URL: https://issues.apache.org/jira/browse/FLINK-6261 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Add support for the TUMBLE, HOP, SESSION keywords for batch SQL. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6261) Add support for TUMBLE, HOP, SESSION to batch SQL
Fabian Hueske created FLINK-6261: Summary: Add support for TUMBLE, HOP, SESSION to batch SQL Key: FLINK-6261 URL: https://issues.apache.org/jira/browse/FLINK-6261 Project: Flink Issue Type: Sub-task Components: Table API & SQL Affects Versions: 1.3.0 Reporter: Fabian Hueske Add support for the TUMBLE, HOP, SESSION keywords for batch SQL. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3668: [FLINK-6254] [cep] Same method name for late data outputs...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3668 LGTM! ð --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954888#comment-15954888 ] ASF GitHub Bot commented on FLINK-6254: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3668 LGTM! 👍 > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6258) Deprecate ListCheckpointed interface for managed operator state
[ https://issues.apache.org/jira/browse/FLINK-6258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-6258: Component/s: DataStream API > Deprecate ListCheckpointed interface for managed operator state > --- > > Key: FLINK-6258 > URL: https://issues.apache.org/jira/browse/FLINK-6258 > Project: Flink > Issue Type: Improvement > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai > > Per discussion in https://github.com/apache/flink/pull/3508, we consider > deprecating the `ListCheckpointed` interface to discourage Java serialization > shortcuts for state registrations (towards this, the Java serialization > shortcuts provided by the `OperatorStateStore` interface have already been > deprecated in https://github.com/apache/flink/pull/3508). > We should also remember to update > https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/stream/state.html#using-managed-keyed-state > if we decide to do this. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-3089) State API Should Support Data Expiration
[ https://issues.apache.org/jira/browse/FLINK-3089?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954901#comment-15954901 ] Aljoscha Krettek commented on FLINK-3089: - Yes, I admit that it is a bit of a shortcoming. This was a conscious decision because we didn't yet know (and still don't) how we want to store very large amounts of timers and whether a future solution might make overrides/deletions very costly. [~xiaogang.shi] do you have any insights into how costly timer removal is for the RocksDB timers? > State API Should Support Data Expiration > > > Key: FLINK-3089 > URL: https://issues.apache.org/jira/browse/FLINK-3089 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Niels Basjes > > In some usecases (webanalytics) there is a need to have a state per visitor > on a website (i.e. keyBy(sessionid) ). > At some point the visitor simply leaves and no longer creates new events (so > a special 'end of session' event will not occur). > The only way to determine that a visitor has left is by choosing a timeout, > like "After 30 minutes no events we consider the visitor 'gone'". > Only after this (chosen) timeout has expired should we discard this state. > In the Trigger part of Windows we can set a timer and close/discard this kind > of information. But that introduces the buffering effect of the window (which > in some scenarios is unwanted). > What I would like is to be able to set a timeout on a specific OperatorState > value which I can update afterwards. > This makes it possible to create a map function that assigns the right value > and that discards the state automatically. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3668: [FLINK-6254] [cep] Same method name for late data outputs...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3668 Thanks @aljoscha ! Merging this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3668: [FLINK-6254] [cep] Same method name for late data ...
Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3668 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954907#comment-15954907 ] ASF GitHub Bot commented on FLINK-6254: --- Github user kl0u closed the pull request at: https://github.com/apache/flink/pull/3668 > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954908#comment-15954908 ] Kostas Kloudas commented on FLINK-6254: --- Merged at 1b6baddca07bfba6093951e82ac9108cf4728f2a > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954906#comment-15954906 ] ASF GitHub Bot commented on FLINK-6254: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/3668 Thanks @aljoscha ! Merging this. > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Resolved] (FLINK-6254) Consolidate late data methods on PatternStream and WindowedStream
[ https://issues.apache.org/jira/browse/FLINK-6254?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas resolved FLINK-6254. --- Resolution: Fixed > Consolidate late data methods on PatternStream and WindowedStream > - > > Key: FLINK-6254 > URL: https://issues.apache.org/jira/browse/FLINK-6254 > Project: Flink > Issue Type: Improvement > Components: CEP >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas >Priority: Blocker > Fix For: 1.3.0 > > > {{WindowedStream}} has {{sideOutputLateData(OutputTag outputTag)}} while > {{PatternStream}} has {{withLateDataOutputTag(OutputTag outputTag)}}. > {{WindowedStream}} had the method first so we should stick to that naming > scheme. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109616209 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { -// Detects window expressions by pattern matching -// supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), -// with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => -val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] -val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] -val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) -call.getType match { - case TimeModeTypes.PROCTIME => -return Some(w) - case TimeModeTypes.ROWTIME => -return Some(w.on("rowtime")) - case _ => -} - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } -None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], -RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + protected def unwrapLiteral[T](node: RexNode): T = +node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - private[flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = +unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue() --- End diff -- Does Calcite ensure that operands can only be literals, no input reference? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109615593 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala --- @@ -350,7 +350,16 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.EXISTS, // EXTENSIONS EventTimeExtractor, -ProcTimeExtractor +ProcTimeExtractor, +SqlStdOperatorTable.TUMBLE, +SqlStdOperatorTable.TUMBLE_START, --- End diff -- Do we already support the `START/END` functions? We should let them unsupported until they are implemented and tested. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954915#comment-15954915 ] ASF GitHub Bot commented on FLINK-6011: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109616209 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { -// Detects window expressions by pattern matching -// supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), -// with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => -val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] -val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] -val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) -call.getType match { - case TimeModeTypes.PROCTIME => -return Some(w) - case TimeModeTypes.ROWTIME => -return Some(w.on("rowtime")) - case _ => -} - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } -None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], -RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + protected def unwrapLiteral[T](node: RexNode): T = +node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - private[flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = +unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue() --- End diff -- Does Calcite ensure that operands can only be literals, no input reference? > Support TUMBLE, HOP, SESSION window in streaming SQL > > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954914#comment-15954914 ] ASF GitHub Bot commented on FLINK-6011: --- Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109615593 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/validate/FunctionCatalog.scala --- @@ -350,7 +350,16 @@ class BasicOperatorTable extends ReflectiveSqlOperatorTable { SqlStdOperatorTable.EXISTS, // EXTENSIONS EventTimeExtractor, -ProcTimeExtractor +ProcTimeExtractor, +SqlStdOperatorTable.TUMBLE, +SqlStdOperatorTable.TUMBLE_START, --- End diff -- Do we already support the `START/END` functions? We should let them unsupported until they are implemented and tested. > Support TUMBLE, HOP, SESSION window in streaming SQL > > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3669: [FLINK-6215] Make the StatefulSequenceSource scala...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3669 [FLINK-6215] Make the StatefulSequenceSource scalable. So far this source was computing all the elements to be emitted and stored them in memory. This could lead to out-of-memory problems for large deployments. Now we split the range of elements into partitions that can be re-shuffled upon rescaling and we just store the next offset and the end of each one of them upon checkpointing. The current version of the PR has no backwards compatibility, as this becomes tricky given that we change the semantics of the state that we store. I believe that this is ok, given that it is a fix that has to go in the 1.3 and we are not sure if people are actually using it in production, i.e. in settings that need backwards compatibility. What do you think @aljoscha @StephanEwen ? You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink stateful-src Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3669 commit cf333b0b0c318569a1704ca71121c37dcd12bd3d Author: kl0u Date: 2017-03-29T16:21:02Z [FLINK-6215] Make the StatefulSequenceSource scalable. So far this source was computing all the elements to be emitted and stored them in memory. This could lead to out-of-memory problems for large deployments. Now we do split the range of elements into partitions that can be re-shuffled upon rescaling and we just store the next offset and the end of each one of them upon checkpointing. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6215) Make the StatefulSequenceSource scalable.
[ https://issues.apache.org/jira/browse/FLINK-6215?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954925#comment-15954925 ] ASF GitHub Bot commented on FLINK-6215: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/3669 [FLINK-6215] Make the StatefulSequenceSource scalable. So far this source was computing all the elements to be emitted and stored them in memory. This could lead to out-of-memory problems for large deployments. Now we split the range of elements into partitions that can be re-shuffled upon rescaling and we just store the next offset and the end of each one of them upon checkpointing. The current version of the PR has no backwards compatibility, as this becomes tricky given that we change the semantics of the state that we store. I believe that this is ok, given that it is a fix that has to go in the 1.3 and we are not sure if people are actually using it in production, i.e. in settings that need backwards compatibility. What do you think @aljoscha @StephanEwen ? You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink stateful-src Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3669.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3669 commit cf333b0b0c318569a1704ca71121c37dcd12bd3d Author: kl0u Date: 2017-03-29T16:21:02Z [FLINK-6215] Make the StatefulSequenceSource scalable. So far this source was computing all the elements to be emitted and stored them in memory. This could lead to out-of-memory problems for large deployments. Now we do split the range of elements into partitions that can be re-shuffled upon rescaling and we just store the next offset and the end of each one of them upon checkpointing. > Make the StatefulSequenceSource scalable. > - > > Key: FLINK-6215 > URL: https://issues.apache.org/jira/browse/FLINK-6215 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.3.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > Currently the {{StatefulSequenceSource}} instantiates all the elements to > emit first and keeps them in memory. This is not scalable as for large > sequences of elements this can lead to out of memory exceptions. > To solve this, we can pre-partition the sequence of elements based on the > {{maxParallelism}} parameter, and just keep state (to checkpoint) per such > partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-4840) Measure latency of record processing and expose it as a metric
[ https://issues.apache.org/jira/browse/FLINK-4840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954938#comment-15954938 ] Chesnay Schepler commented on FLINK-4840: - I may have found a suitable implementation alternative: The key problem in the existing approach is that it calculates the time taken for every invocation of the method, which is just to expensive since this requires 2 time measurements (which should also use nanoTime which is even more expensive), as well as using a histogram. My idea would be to * no longer create a histogram since this can be done easily outside of Flink and only provide raw time measurements * not measure the time for every call, but instead only a fixed number of times over a period of time. We already have all tools that we require for this, the View interface. We can generalize the details in a new Timer interface: {code} public interface Timer implements Metric { void start(); void end(); long getTime(); // last measure time } {code} The following TimerView implementation relies on the View interface to be regularly (every 5 seconds) enabled using the update() method. If the TimerView is not enabled start() and stop() are no-ops. If it is enabled it will take a single measurement. The implementation could look like this: {code} public class TimerView implements Timer, View { private boolean enabled = false; private long startTime = 0; private long lastMeasurement = -1; public void update() { enabled = true; } public void start() { if (enabled) { startTime = System.nanoTime(); } } public void stop() { if (enabled) { lastMeasurement = System.nanoTime() - startTime; // convert to millis or smth enabled = false; } } public long getTime() { return lastMeasurement; } } {code} I quickly threw this together so here are of course some details missing, like what happens when stop() is never called and such. But the general approach seems reasonable to me; tell me what you think. > Measure latency of record processing and expose it as a metric > -- > > Key: FLINK-4840 > URL: https://issues.apache.org/jira/browse/FLINK-4840 > Project: Flink > Issue Type: Improvement > Components: Metrics >Reporter: zhuhaifeng >Assignee: zhuhaifeng >Priority: Minor > > We should expose the following Metrics on the TaskIOMetricGroup: > 1. recordProcessLatency(ms): Histogram measuring the processing time per > record of a task. It is the processing time of chain if a chained task. > 2. recordProcTimeProportion(ms): Meter measuring the proportion of record > processing time for infor whether the main cost -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3348: [FLINK-5090] [network] Add metrics for details about inbo...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 ok, sorry, these slipped through... please note however, that the not-null checks in #3610 become obsolete with this PR --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5090) Expose optionally detailed metrics about network queue lengths
[ https://issues.apache.org/jira/browse/FLINK-5090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15954976#comment-15954976 ] ASF GitHub Bot commented on FLINK-5090: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/3348 ok, sorry, these slipped through... please note however, that the not-null checks in #3610 become obsolete with this PR > Expose optionally detailed metrics about network queue lengths > -- > > Key: FLINK-5090 > URL: https://issues.apache.org/jira/browse/FLINK-5090 > Project: Flink > Issue Type: New Feature > Components: Metrics, Network >Affects Versions: 1.1.3 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > > For debugging purposes, it is important to have access to more detailed > metrics about the length of network input and output queues. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3611: [backport] [FLINK-6183]/[FLINK-6184] Prevent some ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3611 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6183) TaskMetricGroup may not be cleanup when Task.run() is never called or exits early
[ https://issues.apache.org/jira/browse/FLINK-6183?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955020#comment-15955020 ] ASF GitHub Bot commented on FLINK-6183: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3611 > TaskMetricGroup may not be cleanup when Task.run() is never called or exits > early > - > > Key: FLINK-6183 > URL: https://issues.apache.org/jira/browse/FLINK-6183 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.2.0, 1.3.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Blocker > > The TaskMetricGroup is created when a Task is created. It is cleaned up at > the end of Task.run() in the finally block. If however run() is never called > due some failure between the creation and the call to run the metric group is > never closed. This also means that the JobMetricGroup is never closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3664: [FLINK-5808] Revert changes
Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3664 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955027#comment-15955027 ] ASF GitHub Bot commented on FLINK-5808: --- Github user aljoscha closed the pull request at: https://github.com/apache/flink/pull/3664 > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3664: [FLINK-5808] Revert changes
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3664 Thanks for reviewing! I'm merging... --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955026#comment-15955026 ] ASF GitHub Bot commented on FLINK-5808: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/3664 Thanks for reviewing! I'm merging... > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955036#comment-15955036 ] Aljoscha Krettek commented on FLINK-2147: - If I'm not mistaken, a count-min sketch or similar sketches serve to reduce the size of the state that you need to keep by keeping state for several keys in one approximate data structure. For example, assume we have an input stream with k different keys. A naive solution for determining frequencies would keep k * s (where s is the per-key state size) counters. These counters are separate, per-key, and the counter is incremented when we see an element for a given key. When using a sketch you can reduce the state size but you no longer have state that is separate per key, correct? If this is correct then there is no easy way of implementing this in Flink because window state (which is regular keyed state) is per-key. Keeping state that is not per-key is quite tricky when it comes to changing the parallelism of a topology, which you can do with savepoints. > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955041#comment-15955041 ] Stefano Bortoli commented on FLINK-6250: I will approach this implementing a processing function starting from the the simple aggregation: 1 - add a "distinctValue" MapState counting aggregated unique values in the window, 2 - aggregating when the value is previously unseen 3 - decreasing counter when the value goes out of boundaries 4 - retract aggregator & remove from state when the counter is set to zero. > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5808) Missing verification for setParallelism and setMaxParallelism
[ https://issues.apache.org/jira/browse/FLINK-5808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955050#comment-15955050 ] ASF GitHub Bot commented on FLINK-5808: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3670 [FLINK-5808] Revert changes This reverts the changes around FLINK-5808 because they introduced follow-up issues. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink revert-5808 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3670.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3670 commit 153a2ecc4162760bac8538a578b5ed91bcd52b44 Author: Aljoscha Krettek Date: 2017-04-04T12:02:37Z Revert "[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()" This reverts commit f31a55e08ddb7b4bc9e47577a187bac31ad42f4b. The fixes around FLINK-5808 introduced follow-up issues. commit 66a3477284f357776d5edf14380a0a642286023c Author: Aljoscha Krettek Date: 2017-04-04T12:03:26Z Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig" This reverts commit e4fbae36207c563363eed39886c24eea51d7db01. The fixes around FLINK-5808 introduced follow-up issues. commit 8474db4c20ab35937c7640c3289f0bd7c3edc81b Author: Aljoscha Krettek Date: 2017-04-04T12:04:40Z Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator" This reverts commit 9cfae899358e0694c3ecedae1fad20e428a1d359. The fixes around FLINK-5808 introduced follow-up issues. > Missing verification for setParallelism and setMaxParallelism > - > > Key: FLINK-5808 > URL: https://issues.apache.org/jira/browse/FLINK-5808 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.2.0 >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.3.0, 1.2.1 > > > When {{setParallelism()}} is called we don't verify that it is <= than max > parallelism. Likewise, when {{setMaxParallelism()}} is called we don't check > that the new value doesn't clash with a previously set parallelism. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3670: [FLINK-5808] Revert changes
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/3670 [FLINK-5808] Revert changes This reverts the changes around FLINK-5808 because they introduced follow-up issues. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink revert-5808 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/3670.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #3670 commit 153a2ecc4162760bac8538a578b5ed91bcd52b44 Author: Aljoscha Krettek Date: 2017-04-04T12:02:37Z Revert "[FLINK-5808] Add proper checks in setParallelism()/setMaxParallelism()" This reverts commit f31a55e08ddb7b4bc9e47577a187bac31ad42f4b. The fixes around FLINK-5808 introduced follow-up issues. commit 66a3477284f357776d5edf14380a0a642286023c Author: Aljoscha Krettek Date: 2017-04-04T12:03:26Z Revert "[FLINK-5808] Move max keygroup constants to ExecutionConfig" This reverts commit e4fbae36207c563363eed39886c24eea51d7db01. The fixes around FLINK-5808 introduced follow-up issues. commit 8474db4c20ab35937c7640c3289f0bd7c3edc81b Author: Aljoscha Krettek Date: 2017-04-04T12:04:40Z Revert "[FLINK-5808] Move default parallelism to StreamingJobGraphGenerator" This reverts commit 9cfae899358e0694c3ecedae1fad20e428a1d359. The fixes around FLINK-5808 introduced follow-up issues. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-5991) Expose Broadcast Operator State through public APIs
[ https://issues.apache.org/jira/browse/FLINK-5991?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955100#comment-15955100 ] ASF GitHub Bot commented on FLINK-5991: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/3508 I'm not so sure, but I think Stefan has a point with the potential confusion. Especially if in the future, if both the keyed state _and_ the operator state are registered through the initialization context, it can definitely be confusing if both states have the `getListState` or even `getMapState` ... methods, but actually have completely different semantics. To address @StefanRRichter's concerns, perhaps we could 1) Try to emphasise the semantic differences in the docs / Javadocs. 2) Consider renaming `getListState` to `getXXXListState`? "XXX" needs to imply the round-robin redistribution scheme. I was also wondering about another possibility, which follows a completely different approach: `getRepartitionableListState(descriptor, REDISTRIBUTE_MODE)`. > Expose Broadcast Operator State through public APIs > --- > > Key: FLINK-5991 > URL: https://issues.apache.org/jira/browse/FLINK-5991 > Project: Flink > Issue Type: New Feature > Components: DataStream API, State Backends, Checkpointing >Reporter: Tzu-Li (Gordon) Tai >Assignee: Tzu-Li (Gordon) Tai > Fix For: 1.3.0 > > > The broadcast operator state functionality was added in FLINK-5265, it just > hasn't been exposed through any public APIs yet. > Currently, we have 2 streaming connector features for 1.3 that are pending on > broadcast state: rescalable Kinesis / Kafka consumers with shard / partition > discovery (FLINK-4821 & FLINK-4022). We should consider exposing broadcast > state for the 1.3 release also. > This JIRA also serves the purpose to discuss how we want to expose it. > To initiate the discussion, I propose: > 1. For the more powerful {{CheckpointedFunction}}, add the following to the > {{OperatorStateStore}} interface: > {code} > ListState getBroadcastOperatorState(ListStateDescriptor > stateDescriptor); > ListState > getBroadcastSerializableListState(String stateName); > {code} > 2. For a simpler {{ListCheckpointed}} variant, we probably should have a > separate {{BroadcastListCheckpointed}} interface. > Extending {{ListCheckpointed}} to let the user define either the list state > type of either {{PARTITIONABLE}} or {{BROADCAST}} might also be possible, if > we can rely on a contract that the value doesn't change. Or we expose a > defining method (e.g. {{getListStateType()}}) that is called only once in the > operator. This would break user code, but can be considered because it is > marked as {{PublicEvolving}}. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955103#comment-15955103 ] Stavros Kontopoulos commented on FLINK-2147: I think Count-min sketch can be implemented in way that each task keeps a local count-min sketch as state, and as a next step it emits the frequencies after an aggregation of count-mi sketches. This could be windows based and would involve to implement custom operators. This is a high level description and may not fit exactly to the internals. A distributed implementation here: https://www.slideshare.net/databricks/sketching-big-data-with-spark-randomized-algorithms-for-largescale-data-analytics https://github.com/apache/spark/pull/10911/files > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955103#comment-15955103 ] Stavros Kontopoulos edited comment on FLINK-2147 at 4/4/17 1:01 PM: I think Count-min sketch can be implemented in way that each task keeps a local count-min sketch as state, and as a next step it emits the frequencies after an aggregation of count-min sketches. Sketched of this type can be merged. This could be windows based and would involve to implement custom operators. This is a high level description and may not fit exactly to the internals. A distributed implementation here: https://www.slideshare.net/databricks/sketching-big-data-with-spark-randomized-algorithms-for-largescale-data-analytics https://github.com/apache/spark/pull/10911/files was (Author: skonto): I think Count-min sketch can be implemented in way that each task keeps a local count-min sketch as state, and as a next step it emits the frequencies after an aggregation of count-mi sketches. This could be windows based and would involve to implement custom operators. This is a high level description and may not fit exactly to the internals. A distributed implementation here: https://www.slideshare.net/databricks/sketching-big-data-with-spark-randomized-algorithms-for-largescale-data-analytics https://github.com/apache/spark/pull/10911/files > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955103#comment-15955103 ] Stavros Kontopoulos edited comment on FLINK-2147 at 4/4/17 1:02 PM: I think Count-min sketch can be implemented in way that each task keeps a local count-min sketch as state, and as a next step it emits the frequencies after an aggregation of count-min sketches. Sketches of this type can be merged. This could be windows based and would involve to implement custom operators. This is a high level description and may not fit exactly to the internals. A distributed implementation here: https://www.slideshare.net/databricks/sketching-big-data-with-spark-randomized-algorithms-for-largescale-data-analytics https://github.com/apache/spark/pull/10911/files was (Author: skonto): I think Count-min sketch can be implemented in way that each task keeps a local count-min sketch as state, and as a next step it emits the frequencies after an aggregation of count-min sketches. Sketched of this type can be merged. This could be windows based and would involve to implement custom operators. This is a high level description and may not fit exactly to the internals. A distributed implementation here: https://www.slideshare.net/databricks/sketching-big-data-with-spark-randomized-algorithms-for-largescale-data-analytics https://github.com/apache/spark/pull/10911/files > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955117#comment-15955117 ] Fabian Hueske commented on FLINK-6250: -- I'm not sure about implementing this as a separate {{ProcessFunction}}. I think we should adapt the existing ones because we will need support for distinct and not distinct aggregates in the same {{ProcessFunction}}. > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6262) UnknownTopicOrPartitionException Kafka consumer error on broker restart/failure
Gyula Fora created FLINK-6262: - Summary: UnknownTopicOrPartitionException Kafka consumer error on broker restart/failure Key: FLINK-6262 URL: https://issues.apache.org/jira/browse/FLINK-6262 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.1.4, 1.2.0 Reporter: Gyula Fora The Kafka consumer fails on broker restarts/failures with the following error: java.io.IOException: Error while fetching from broker 'Node(22, kafka22.sto.midasplayer.com, 9092)': Exception for event.bifrost.log:10: kafka.common.UnknownTopicOrPartitionException at sun.reflect.GeneratedConstructorAccessor35.newInstance(Unknown Source) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:422) at java.lang.Class.newInstance(Class.java:442) at kafka.common.ErrorMapping$.exceptionFor(ErrorMapping.scala:86) at kafka.common.ErrorMapping.exceptionFor(ErrorMapping.scala) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:292) at org.apache.flink.streaming.connectors.kafka.internals.SimpleConsumerThread.run(SimpleConsumerThread.java:313) We should have some restart logic around this -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6263) Leader error in Kafka producer on leader change (broker restart/failrue)
Gyula Fora created FLINK-6263: - Summary: Leader error in Kafka producer on leader change (broker restart/failrue) Key: FLINK-6263 URL: https://issues.apache.org/jira/browse/FLINK-6263 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.0 Reporter: Gyula Fora We have observed the following error in the Kafka producer java.lang.Exception: Failed to send data to Kafka: This server is not the leader for that topic-partition. at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.checkErroneous(FlinkKafkaProducerBase.java:376) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.invoke(FlinkKafkaProducerBase.java:293) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:38) at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:185) at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:63) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) at java.lang.Thread.run(Thread.java:745) Caused by: org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6264) Kafka consumer fails if can't find leader for partition
Gyula Fora created FLINK-6264: - Summary: Kafka consumer fails if can't find leader for partition Key: FLINK-6264 URL: https://issues.apache.org/jira/browse/FLINK-6264 Project: Flink Issue Type: Bug Components: Kafka Connector Affects Versions: 1.2.0 Reporter: Gyula Fora We have observed the following error many times when brokers failed/were restarted: java.lang.RuntimeException: Unable to find a leader for partitions: [Partition: KafkaTopicPartition{topic='mytopic', partition=10}, KafkaPartitionHandle=[mytopic,10], offset=-1] at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.findLeaderForPartitions(Kafka08Fetcher.java:474) at org.apache.flink.streaming.connectors.kafka.internals.Kafka08Fetcher.runFetchLoop(Kafka08Fetcher.java:194) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:256) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:78) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:55) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:56) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:261) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:656) at java.lang.Thread.run(Thread.java:745) -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955147#comment-15955147 ] Aljoscha Krettek commented on FLINK-2147: - Yes, but what I'm saying is that it is not easy to deal with these task-local states when you change parallelism. For example, assume that you have parallelism 3. You have three task-local states. Now, the parallelism is changed to 2. How do you redistribute the sketch state? Keep in mind that Flink uses a (more or less) fixed partitioner for deciding where to send keyed elements. We have this to ensure that elements go to the parallel operator that is responsible for a key and that has the correct state. The reverse problem is even harder, I think. For example. when you want to scale from parallelism 1 to a higher parallelism. > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955164#comment-15955164 ] Stavros Kontopoulos edited comment on FLINK-2147 at 4/4/17 2:01 PM: You just pick one of the sketches merge it with another one kill the task (3 down to 2 case). For 1 to N. Just split the stream and create N count-min sketches. Wouldn't that work? was (Author: skonto): You just pick one of the sketches merge it with another one kill the task (3 down to 2 case). For 1 to N. Just split the stream and create N count-min sketched. Wouldn't that work? > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955164#comment-15955164 ] Stavros Kontopoulos commented on FLINK-2147: You just pick one of the sketches merge it with another one kill the task (3 down to 2 case). For 1 to N. Just split the stream and create N count-min sketched. Wouldn't that work? > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955166#comment-15955166 ] Gabor Gevay commented on FLINK-2147: Maybe we could over-partition the sketch into maxParallelism parts. (similarly, as we have more key-groups than actual partitions) > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-2147) Approximate calculation of frequencies in data streams
[ https://issues.apache.org/jira/browse/FLINK-2147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955164#comment-15955164 ] Stavros Kontopoulos edited comment on FLINK-2147 at 4/4/17 2:04 PM: You just pick one of the sketches merge it with another one kill the task (3 down to 2 case). For 1 to N. Just split the stream and create N-1 count-min sketches, keep the first as is. Wouldn't that work? was (Author: skonto): You just pick one of the sketches merge it with another one kill the task (3 down to 2 case). For 1 to N. Just split the stream and create N count-min sketches. Wouldn't that work? > Approximate calculation of frequencies in data streams > -- > > Key: FLINK-2147 > URL: https://issues.apache.org/jira/browse/FLINK-2147 > Project: Flink > Issue Type: New Feature > Components: DataStream API >Reporter: Gabor Gevay > Labels: approximate, statistics > > Count-Min sketch is a hashing-based algorithm for approximately keeping track > of the frequencies of elements in a data stream. It is described by Cormode > et al. in the following paper: > http://dimacs.rutgers.edu/~graham/pubs/papers/cmsoft.pdf > Note that this algorithm can be conveniently implemented in a distributed > way, as described in section 3.2 of the paper. > The paper > http://www.vldb.org/conf/2002/S10P03.pdf > also describes algorithms for approximately keeping track of frequencies, but > here the user can specify a threshold below which she is not interested in > the frequency of an element. The error-bounds are also different than the > Count-min sketch algorithm. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-5994) Add Janino to flink-table JAR file
[ https://issues.apache.org/jira/browse/FLINK-5994?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955189#comment-15955189 ] ASF GitHub Bot commented on FLINK-5994: --- Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3656 Thanks @fhueske , I had moved the PR to FLINK-6247, and think about how to deal with this JIRA. > Add Janino to flink-table JAR file > -- > > Key: FLINK-5994 > URL: https://issues.apache.org/jira/browse/FLINK-5994 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Affects Versions: 1.2.0 >Reporter: Timo Walther >Assignee: sunjincheng > > It seems that Janino is not part of the flink-table JAR file although it is a > dependency in pom.xml. Users adding flink-table to Flink's lib folder because > of FLINK-5227 cannot run table program due to the missing Janino dependency. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink issue #3656: [FLINK-5994][table] Add Janino to flink-dist JAR file
Github user sunjincheng121 commented on the issue: https://github.com/apache/flink/pull/3656 Thanks @fhueske , I had moved the PR to FLINK-6247, and think about how to deal with this JIRA. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6250) Distinct procTime with Rows boundaries
[ https://issues.apache.org/jira/browse/FLINK-6250?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955200#comment-15955200 ] Stefano Bortoli commented on FLINK-6250: I am also fine implementing it extending the existing function. Practically it would be what would have happened, but in another file. Currently I am trying to understand where the DISTINCT marker gets lost as it does not arrive to the OVER aggregation function. Probably it is intercepted in early stages of the query processing. > Distinct procTime with Rows boundaries > -- > > Key: FLINK-6250 > URL: https://issues.apache.org/jira/browse/FLINK-6250 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: radu >Assignee: Stefano Bortoli > > Support proctime with rows boundaries > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.1. `SELECT COUNT(b), SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS > BETWEEN 2 PRECEDING AND CURRENT ROW) FROM stream1` -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5987) Upgrade zookeeper dependency to 3.4.8
[ https://issues.apache.org/jira/browse/FLINK-5987?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15942518#comment-15942518 ] Ted Yu edited comment on FLINK-5987 at 4/4/17 3:17 PM: --- We can upgrade to 3.4.10 . was (Author: yuzhih...@gmail.com): Actually 3.4.10 is being voted on. We can upgrade to 3.4.10 soon. > Upgrade zookeeper dependency to 3.4.8 > - > > Key: FLINK-5987 > URL: https://issues.apache.org/jira/browse/FLINK-5987 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu > > zookeeper 3.4.8 has been released. > Among the fixes the following are desirable: > ZOOKEEPER-706 large numbers of watches can cause session re-establishment to > fail > ZOOKEEPER-1797 PurgeTxnLog may delete data logs during roll > This issue upgrades zookeeper dependency to 3.4.8 -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5943) Unprotected access to haServices in YarnFlinkApplicationMasterRunner#shutdown()
[ https://issues.apache.org/jira/browse/FLINK-5943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5943: -- Description: {code} protected void shutdown(ApplicationStatus status, String msg) { // Need to clear the job state in the HA services before shutdown try { haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); } {code} The access to haServices is without lock protection. haServices may have been closed. was: {code} protected void shutdown(ApplicationStatus status, String msg) { // Need to clear the job state in the HA services before shutdown try { haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); } {code} The access to haServices is without lock protection. haServices may have been closed. > Unprotected access to haServices in > YarnFlinkApplicationMasterRunner#shutdown() > --- > > Key: FLINK-5943 > URL: https://issues.apache.org/jira/browse/FLINK-5943 > Project: Flink > Issue Type: Bug > Components: YARN >Reporter: Ted Yu >Priority: Minor > > {code} > protected void shutdown(ApplicationStatus status, String msg) { > // Need to clear the job state in the HA services before shutdown > try { > haServices.getRunningJobsRegistry().clearJob(jobGraph.getJobID()); > } > {code} > The access to haServices is without lock protection. > haServices may have been closed. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Comment Edited] (FLINK-5629) Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader()
[ https://issues.apache.org/jira/browse/FLINK-5629?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15854366#comment-15854366 ] Ted Yu edited comment on FLINK-5629 at 4/4/17 3:18 PM: --- RandomAccessFile#length() may throw IOE. raf is used in the following code path where DefaultFileRegion is not involved: {code} } else { lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), {code} It is good practice to close RandomAccessFile in all code paths. was (Author: yuzhih...@gmail.com): RandomAccessFile#length() may throw IOE. raf is used in the following code path where DefaultFileRegion is not involved: {code} } else { lastContentFuture = ctx.writeAndFlush(new HttpChunkedInput(new ChunkedFile(raf, 0, fileLength, 8192)), {code} It is good practice to close RandomAccessFile in all code paths. > Unclosed RandomAccessFile in StaticFileServerHandler#respondAsLeader() > -- > > Key: FLINK-5629 > URL: https://issues.apache.org/jira/browse/FLINK-5629 > Project: Flink > Issue Type: Bug > Components: Webfrontend >Reporter: Ted Yu >Priority: Minor > > {code} > final RandomAccessFile raf; > try { > raf = new RandomAccessFile(file, "r"); > ... > long fileLength = raf.length(); > {code} > The RandomAccessFile should be closed upon return from method. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5855) Unprotected access to pendingFilesPerCheckpoint in BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-5855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5855: -- Description: {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). was: {code} handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); synchronized (restoredState.pendingFilesPerCheckpoint) { restoredState.pendingFilesPerCheckpoint.clear(); {code} Lock on pendingFilesPerCheckpoint should be obtained prior to the call to handlePendingFilesForPreviousCheckpoints(). > Unprotected access to pendingFilesPerCheckpoint in BucketingSink > > > Key: FLINK-5855 > URL: https://issues.apache.org/jira/browse/FLINK-5855 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Reporter: Ted Yu >Priority: Minor > > {code} > handlePendingFilesForPreviousCheckpoints(restoredState.pendingFilesPerCheckpoint); > synchronized (restoredState.pendingFilesPerCheckpoint) { > restoredState.pendingFilesPerCheckpoint.clear(); > {code} > Lock on pendingFilesPerCheckpoint should be obtained prior to the call to > handlePendingFilesForPreviousCheckpoints(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-5541) Missing null check for localJar in FlinkSubmitter#submitTopology()
[ https://issues.apache.org/jira/browse/FLINK-5541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-5541: -- Description: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). was: {code} if (localJar == null) { try { for (final URL url : ((ContextEnvironment) ExecutionEnvironment.getExecutionEnvironment()) .getJars()) { // TODO verify that there is only one jar localJar = new File(url.toURI()).getAbsolutePath(); } } catch (final URISyntaxException e) { // ignore } catch (final ClassCastException e) { // ignore } } logger.info("Submitting topology " + name + " in distributed mode with conf " + serConf); client.submitTopologyWithOpts(name, localJar, topology); {code} Since the try block may encounter URISyntaxException / ClassCastException, we should check that localJar is not null before calling submitTopologyWithOpts(). > Missing null check for localJar in FlinkSubmitter#submitTopology() > -- > > Key: FLINK-5541 > URL: https://issues.apache.org/jira/browse/FLINK-5541 > Project: Flink > Issue Type: Bug > Components: Storm Compatibility >Reporter: Ted Yu >Priority: Minor > > {code} > if (localJar == null) { > try { > for (final URL url : ((ContextEnvironment) > ExecutionEnvironment.getExecutionEnvironment()) > .getJars()) { > // TODO verify that there is only one jar > localJar = new File(url.toURI()).getAbsolutePath(); > } > } catch (final URISyntaxException e) { > // ignore > } catch (final ClassCastException e) { > // ignore > } > } > logger.info("Submitting topology " + name + " in distributed mode with > conf " + serConf); > client.submitTopologyWithOpts(name, localJar, topology); > {code} > Since the try block may encounter URISyntaxException / ClassCastException, we > should check that localJar is not null before calling > submitTopologyWithOpts(). -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (FLINK-6265) Fix consecutive() for times() pattern.
Kostas Kloudas created FLINK-6265: - Summary: Fix consecutive() for times() pattern. Key: FLINK-6265 URL: https://issues.apache.org/jira/browse/FLINK-6265 Project: Flink Issue Type: Bug Components: CEP Affects Versions: 1.3.0 Reporter: Kostas Kloudas Assignee: Kostas Kloudas Fix For: 1.3.0 When using {{next()}} with {{times()}} and {{times()}} is not {{consecutive()}}, the library ignores that relaxed continuity within the pattern. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6249) Distinct Aggregates for OVER window
[ https://issues.apache.org/jira/browse/FLINK-6249?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15955419#comment-15955419 ] radu commented on FLINK-6249: - [~fhueske] [~twalthr] It seems that the aggregates that are passed in the window do not have directly a distinct marker. We will look into it to see how we can solve this. We also wrote to calcite community > Distinct Aggregates for OVER window > --- > > Key: FLINK-6249 > URL: https://issues.apache.org/jira/browse/FLINK-6249 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: radu > Labels: features, patch > > Time target: ProcTime/EventTime > SQL targeted query examples: > > Q1. Boundaries are expressed in windows and meant for the elements to be > aggregated > Q1.1. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.2. `SELECT SUM( DISTINCT b) OVER (ORDER BY procTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > Q1.3. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > Q1.4. `SELECT SUM( DISTINCT b) OVER (ORDER BY rowTime() RANGE BETWEEN > INTERVAL '1' HOUR PRECEDING AND CURRENT ROW) FROM stream1` > General comments: > - DISTINCT operation makes sense only within the context of windows or some > bounded defined structures. Otherwise the operation would keep an infinite > amount of data to ensure uniqueness and would not trigger for certain > functions (e.g. aggregates) > - We can consider as a sub-JIRA issue the implementation of DISTINCT for > UNBOUND sliding windows. However, there would be no control over the data > structure to keep seen data (to check it is not re-process). -> This needs to > be decided if we want to support it (to create appropriate JIRA issues) > => We will open sub-JIRA issues to extend the current functionality of > aggregates for the DISTINCT CASE > => Aggregations over distinct elements without any boundary (i.e. within > SELECT clause) do not make sense just as aggregations do not make sense > without groupings or windows. > Description: > > The DISTINCT operator requires processing the elements to ensure uniqueness. > Either that the operation is used for SELECT ALL distinct elements or for > applying typical aggregation functions over a set of elements, there is a > prior need of forming a collection of elements. > This brings the need of using windows or grouping methods. Therefore the > distinct function will be implemented within windows. Depending on the type > of window definition there are several options: > - Main Scope: If distinct is applied as in Q1 example for window > aggregations than either we extend the implementation with distinct > aggregates (less preferred) or extend the sliding window aggregates > implementation in the processFunction with distinction identification support > (preferred). The later option is preferred because a query can carry multiple > aggregates including multiple aggregates that have the distinct key word set > up. Implementing the distinction between elements in the process function > avoid the need to multiply the data structure to mark what what was seen > across multiple aggregates. It also makes the implementation more robust and > resilient as we can keep the data structure for marking the seen elements in > a state (mapstate). > Functionality example > - > We exemplify below the functionality of the IN/Exists when working with > streams. > `Query: SELECT sum(DISTINCT a) OVER (ORDER BY procTime() ROWS BETWEEN 2 > PRECEDING AND CURRENT ROW) FROM stream1` > ||Proctime||IngestionTime(Event)||Stream1||Q3|| > ||10:00:01| (ab,1)| 1 | > ||10:05:00| (aa,2)|3 | > ||11:03:00| (aa,2)| 3 | > ||11:09:00| (aa,2 |2 | > |...| > Implementation option > - > Considering that the behavior depends on over window behavior, the > implementation will be done by reusing the existing implementation of the > over window functions - done based on processFunction. As mentioned in the > description section, there are 2 options to consider: > 1) Using distinct within the aggregates implementation by extending with > distinct aggregates implementation the current aggregates in Flink. For this > we define additional JIRA issues for each implementation to support the > distinct keyword. > 2) Using distinct for selection within the process logic when calling the > aggregates. This requires a new implementation of the process Function used > for computing the aggregates. The processFunction will also carry the logic > of ta
[jira] [Commented] (FLINK-6261) Add support for TUMBLE, HOP, SESSION to batch SQL
[ https://issues.apache.org/jira/browse/FLINK-6261?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956108#comment-15956108 ] Haohui Mai commented on FLINK-6261: --- [~fhueske], I am interested in contributing this feature. If you haven't started working on it, do you mind assigning to me? Thanks. > Add support for TUMBLE, HOP, SESSION to batch SQL > - > > Key: FLINK-6261 > URL: https://issues.apache.org/jira/browse/FLINK-6261 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Assignee: Fabian Hueske > > Add support for the TUMBLE, HOP, SESSION keywords for batch SQL. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6011) Support TUMBLE, HOP, SESSION window in streaming SQL
[ https://issues.apache.org/jira/browse/FLINK-6011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956126#comment-15956126 ] ASF GitHub Bot commented on FLINK-6011: --- Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109810006 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { -// Detects window expressions by pattern matching -// supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), -// with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => -val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] -val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] -val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) -call.getType match { - case TimeModeTypes.PROCTIME => -return Some(w) - case TimeModeTypes.ROWTIME => -return Some(w.on("rowtime")) - case _ => -} - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } -None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], -RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + protected def unwrapLiteral[T](node: RexNode): T = +node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - private[flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = +unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue() --- End diff -- I just tried out Calcite did not stop you from passing something like a {{RexCall}}. So yes, it can be dynamic. One question: whether Flink actually supports `GroupWindow` that has a dynamic size? Maybe I'm wrong but it does not seem so. If the answer is no maybe we should check that whether it is a `RexLiteral`? > Support TUMBLE, HOP, SESSION window in streaming SQL > > > Key: FLINK-6011 > URL: https://issues.apache.org/jira/browse/FLINK-6011 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: Haohui Mai >Assignee: Haohui Mai > > CALCITE-1603 and CALCITE-1615 introduces the support of the {{TUMBLE}} / > {{HOP}} / {{SESSION}} windows in the parser. > This jira tracks the efforts of adding the corresponding supports on the > planners / optimizers in Flink. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3665: [FLINK-6011] Support TUMBLE, HOP, SESSION window i...
Github user haohui commented on a diff in the pull request: https://github.com/apache/flink/pull/3665#discussion_r109810006 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/rules/datastream/LogicalWindowAggregateRule.scala --- @@ -117,46 +119,86 @@ class LogicalWindowAggregateRule } private def identifyWindow(field: RexNode): Option[Window] = { -// Detects window expressions by pattern matching -// supported patterns: FLOOR(time AS xxx) and CEIL(time AS xxx), -// with time being equal to proctime() or rowtime() field match { case call: RexCall => call.getOperator match { - case _: SqlFloorFunction => -val operand = call.getOperands.get(1).asInstanceOf[RexLiteral] -val unit: TimeUnitRange = operand.getValue.asInstanceOf[TimeUnitRange] -val w = LogicalWindowAggregateRule.timeUnitRangeToTumbleWindow(unit) -call.getType match { - case TimeModeTypes.PROCTIME => -return Some(w) - case TimeModeTypes.ROWTIME => -return Some(w.on("rowtime")) - case _ => -} - case _ => + case _: SqlFloorFunction => FloorWindowTranslator(call).toWindow + case SqlStdOperatorTable.TUMBLE => TumbleWindowTranslator(call).toWindow + case SqlStdOperatorTable.HOP => SlidingWindowTranslator(call).toWindow + case SqlStdOperatorTable.SESSION => SessionWindowTranslator(call).toWindow + case _ => None } - case _ => + case _ => None } -None } - } -object LogicalWindowAggregateRule { +private abstract class WindowTranslator { + val call: RexCall - private[flink] val LOGICAL_WINDOW_PREDICATE = RelOptRule.operand(classOf[LogicalAggregate], -RelOptRule.operand(classOf[LogicalProject], RelOptRule.none())) + protected def unwrapLiteral[T](node: RexNode): T = +node.asInstanceOf[RexLiteral].getValue.asInstanceOf[T] - private[flink] val INSTANCE = new LogicalWindowAggregateRule + protected def getOperandAsLong(idx: Int): Long = +unwrapLiteral[BigDecimal](call.getOperands.get(idx)).longValue() --- End diff -- I just tried out Calcite did not stop you from passing something like a {{RexCall}}. So yes, it can be dynamic. One question: whether Flink actually supports `GroupWindow` that has a dynamic size? Maybe I'm wrong but it does not seem so. If the answer is no maybe we should check that whether it is a `RexLiteral`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #3667: Fix a small spelling error har-with-dependencies -> jar-w...
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/3667 +1 to merge --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6231) completed PendingCheckpoint not release state caused oom
[ https://issues.apache.org/jira/browse/FLINK-6231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956181#comment-15956181 ] Chao Zhao commented on FLINK-6231: -- I tried to set checkPoint interval to 5 min instead of 5 sec, and heap size to 4g,oom disappeared. I think checkPoint interval indeed solved the oom .Not sure if my oom before is a defect, any advice on this? > completed PendingCheckpoint not release state caused oom > - > > Key: FLINK-6231 > URL: https://issues.apache.org/jira/browse/FLINK-6231 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.1.4 > Environment: linux x64 >Reporter: Chao Zhao > > My cluster got one jobmanager and one taskmanager. jobmanager oom repeately , > with jobmanager.heap.mb setting to 256 and 1024. > oom triggered at same scene: check point completed quickly, while these > completed check points still in task queue in CheckpointCoordinator.timer > without taskstate being disposed. > one of my checkpoint with taskstate is about 10m, so about 90 completed > checkpoint caused oom with heap size 1024m. hprof file proved this, can > provide if needed. > I have checked PendingCheckpoint.finalizeCheckpoint, not sure if it should be > dispose(null, true) instead of dispose(null, false). > I have no idea about how to make my taskstate much less > 2017-03-30 10:15:52,260 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 47 @ 1490840152260 > 2017-03-30 10:16:11,781 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 47 (in 19516 ms). > 2017-03-30 10:16:11,781 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 48 @ 1490840171781 > 2017-03-30 10:26:11,781 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 48 > expired before completing. > 2017-03-30 10:26:11,782 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 49 @ 1490840771782 > 2017-03-30 10:36:11,782 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint 49 > expired before completing. > ... all expired > 2017-03-31 00:46:11,826 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > 134 expired before completing. > 2017-03-31 00:46:11,826 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 135 @ 1490892371826 > 2017-03-31 00:56:11,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > 135 expired before completing. > 2017-03-31 00:56:11,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 136 @ 1490892971827 > 2017-03-31 01:06:11,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Checkpoint > 136 expired before completing. > 2017-03-31 01:06:11,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 137 @ 1490893571827 > 2017-03-31 01:06:12,215 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 137 (in 384 ms). > 2017-03-31 01:06:16,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 138 @ 1490893576827 > 2017-03-31 01:06:17,454 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 138 (in 624 ms). > 2017-03-31 01:06:21,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 139 @ 1490893581827 > 2017-03-31 01:06:22,189 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 139 (in 357 ms). > .. all completed in less than 1s > 2017-03-31 01:13:51,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 229 @ 1490894031827 > 2017-03-31 01:13:52,533 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Completed > checkpoint 229 (in 643 ms). > 2017-03-31 01:13:56,827 INFO > org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Triggering > checkpoint 230 @ 1490894036827 > 2017-03-31 01:13:58,963 ERROR akka.actor.ActorSystemImpl > - Uncaught error from thread > [flink-akka.remote.default-remote-dispatcher-5] shutting down JVM since > 'akka.jvm-exit-on-fatal-error' is enabled > java.lang.OutOfMemoryError: Java heap space > at java.lang.reflect.Array.newInstance(Array.java:70) > at java.io.ObjectInputStream.readArray(ObjectInputStream.java:1670) > at java.io.ObjectInputStream.readObject0(ObjectInputStream.java
[GitHub] flink pull request #3667: Fix a small spelling error har-with-dependencies -...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3667 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-6259) Fix a small spelling error
[ https://issues.apache.org/jira/browse/FLINK-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956183#comment-15956183 ] ASF GitHub Bot commented on FLINK-6259: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3667 > Fix a small spelling error > -- > > Key: FLINK-6259 > URL: https://issues.apache.org/jira/browse/FLINK-6259 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > flink-gelly-scala/pom.xml {{har-with-dependencies}} -> > {{jar-with-dependencies}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Closed] (FLINK-6259) Fix a small spelling error
[ https://issues.apache.org/jira/browse/FLINK-6259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jark Wu closed FLINK-6259. -- Resolution: Fixed Fix Version/s: 1.3.0 fixed in cae4976a4b9d4fa67f207dd08b8c9480c6f8989b > Fix a small spelling error > -- > > Key: FLINK-6259 > URL: https://issues.apache.org/jira/browse/FLINK-6259 > Project: Flink > Issue Type: Bug > Components: Gelly >Reporter: sunjincheng >Assignee: sunjincheng > Fix For: 1.3.0 > > > flink-gelly-scala/pom.xml {{har-with-dependencies}} -> > {{jar-with-dependencies}} -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[GitHub] flink pull request #3661: [FLINK-4953] Allow access to "time" in ProcessWind...
Github user manuzhang commented on a diff in the pull request: https://github.com/apache/flink/pull/3661#discussion_r109821341 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -2560,7 +2560,7 @@ public int compare(Object o1, Object o2) { if (comparison != 0) { return comparison; } - return (int) (sr0.getValue().f1 - sr1.getValue().f1); + return (int) (sr0.getValue().f2 - sr1.getValue().f2); --- End diff -- @aljoscha this line compares second field again rather than the third field. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4953) Allow access to "time" in ProcessWindowFunction.Context
[ https://issues.apache.org/jira/browse/FLINK-4953?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956216#comment-15956216 ] ASF GitHub Bot commented on FLINK-4953: --- Github user manuzhang commented on a diff in the pull request: https://github.com/apache/flink/pull/3661#discussion_r109821341 --- Diff: flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java --- @@ -2560,7 +2560,7 @@ public int compare(Object o1, Object o2) { if (comparison != 0) { return comparison; } - return (int) (sr0.getValue().f1 - sr1.getValue().f1); + return (int) (sr0.getValue().f2 - sr1.getValue().f2); --- End diff -- @aljoscha this line compares second field again rather than the third field. > Allow access to "time" in ProcessWindowFunction.Context > --- > > Key: FLINK-4953 > URL: https://issues.apache.org/jira/browse/FLINK-4953 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Manu Zhang >Assignee: Manu Zhang > > The recently added {{ProcessWindowFunction}} has a {{Context}} object that > allows querying some additional information about the window firing that we > are processing. Right now, this is only the window for which the firing is > happening. We should extends this with methods that allow querying the > current processing time and the current watermark. > Original text by issue creator: This is similar to FLINK-3674 but exposing > time information in window functions. Currently when a timer is fired, all > states in a window will be returned to users, including those after the > timer. This change will allow users to filter out states after the timer > based on time info. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (FLINK-6257) Post-pass OVER windows
[ https://issues.apache.org/jira/browse/FLINK-6257?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15956221#comment-15956221 ] sunjincheng commented on FLINK-6257: Hi [~fhueske] I like this proposal very much. We really need to check the field of ORDER BY to ensure that the user is clearly aware of their behavior. And to ensure that the achieve logical of over window achieve is rigorous. IMO. In order to check ORDER BY clause. We have two phases can be checked. First one is in {{planner.validate}} phase, In this phase will check the legitimacy of the `SqlWindow`,Then we can extend this validation to increase the validation of ORDER BY. But perhaps this is more difficult, because due to calcite visibility issues, sql window some properties FLINK cannot get to. Second one is in {{DataStreamOverAggregate/DataStreamOverAggregateRule}} phase, In this phase is very easy to do the checking. Because we can get the TimeType {{inputType.getFieldList.get(overWindow.orderKeys.getFieldCollations.get(0).getFieldIndex).getValue}} and orderType {{overWindow.orderKeys.getFieldCollations.get(0).direction}}. What do you think? > Post-pass OVER windows > -- > > Key: FLINK-6257 > URL: https://issues.apache.org/jira/browse/FLINK-6257 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.3.0 >Reporter: Fabian Hueske >Priority: Critical > > The OVER windows have been implemented by several contributors. > We should do a post pass over the contributed code and: > * Functionality > ** currently every time attributes is allows as ORDER BY attribute. We must > check that this is actually a time indicator ({{procTime()}}, {{rowTime()}}) > an that the order is ASCENDING. > * Documentation > ** Add documentation for OVER windows > * Code style > ** Consistent naming of {{ProcessFunctions}} and methods > * Tests > ** Move the OVER window tests out of SqlITCase into a dedicated class > ** Move the OVER window tests out of WindowAggregateTest into a dedicated > class > ** Add tests based on the test harness for all {{ProcessFunctions}} similar > to {{BoundedProcessingOverRangeProcessFunction}}. The tests should include > exact boundary checks for range windows and check for proper parallelization > with multiple keys. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6228: --- Description: Syntax: {code} table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,...[n]) ) .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) {code} Implement restrictions: ** All OVER clauses in the same SELECT clause must be exactly the same. ** The PARTITION BY clause is optional (no partitioning results in single threaded execution). ** The ORDER BY Before the [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for batch). ** FOLLOWING is not supported. I will soon add a user interface design document. was: Syntax: {code} table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,...[n]) ) .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) {code} Implement restrictions: ** All OVER clauses in the same SELECT clause must be exactly the same. ** The PARTITION BY clause is optional (no partitioning results in single threaded execution). ** The ORDER BY clause is hidden in tableApi, According to time characteristic automatic identification. ** FOLLOWING is not supported. I will soon add a user interface design document. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > ** All OVER clauses in the same SELECT clause must be exactly the same. > ** The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > ** The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > ** FOLLOWING is not supported. > I will soon add a user interface design document. -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (FLINK-6228) Integrating the OVER windows in the Table API
[ https://issues.apache.org/jira/browse/FLINK-6228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] sunjincheng updated FLINK-6228: --- Description: Syntax: {code} table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,...[n]) ) .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) {code} Implement restrictions: ** All OVER clauses in the same SELECT clause must be exactly the same. ** The PARTITION BY clause is optional (no partitioning results in single threaded execution). ** The ORDER BY Before the [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for batch). ** FOLLOWING is not supported. I will soon add a user interface design document. was: Syntax: {code} table .overWindows( (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy order_by_expression] (preceding UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) [following UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] as alias,...[n]) ) .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) {code} Implement restrictions: ** All OVER clauses in the same SELECT clause must be exactly the same. ** The PARTITION BY clause is optional (no partitioning results in single threaded execution). ** The ORDER BY Before the [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for batch). ** FOLLOWING is not supported. I will soon add a user interface design document. > Integrating the OVER windows in the Table API > - > > Key: FLINK-6228 > URL: https://issues.apache.org/jira/browse/FLINK-6228 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Reporter: sunjincheng >Assignee: sunjincheng > > Syntax: > {code} > table >.overWindows( > (Rows|Range [ partitionBy value_expression , ... [ n ]] [ orderBy > order_by_expression] > (preceding > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW) > [following > UNBOUNDED|value_specification.(rows|milli|second|minute|hour|day|month|year)|CURRENTROW] > as alias,...[n]) >) > .select( [col1,...[n]], (agg(col1) OVER overWindowAlias, … [n]) > {code} > Implement restrictions: > ** All OVER clauses in the same SELECT clause must be exactly the same. > ** The PARTITION BY clause is optional (no partitioning results in single > threaded execution). > ** The ORDER BY Before the > [FLINK-5884|https://issues.apache.org/jira/browse/FLINK-5884] implementation > orderBy may only have ‘rowtime/’proctime(for stream)/‘specific-time-field(for > batch). > ** FOLLOWING is not supported. > I will soon add a user interface design document. -- This message was sent by Atlassian JIRA (v6.3.15#6346)