[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337176#comment-17337176 ] Jark Wu commented on FLINK-9717: [~twalthr], yes. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). > edit: > Currently problem is that Flink doesn't keep & restore the last previous > watermark after restoring from checkpoint and this is hard to workaround. > In other words, now we can easily "flush" one side of the join when we > receive MAX_WATERMARK, but what should happen after restoring from > checkpoint? There is no easy way to store the information that MAX_WATERMARK > was previously reached. As far as I have thought about this, it can not be > stored on the state of the Join operator and even if it could be done this > way, it's probably not the proper/elegant solution. Probably the correct > solution is to store MAX_WATERMARK in the state around watermark > emitter/source operator and the last previously emitted watermark should be > re-emitted when the job is restored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17337083#comment-17337083 ] Timo Walther commented on FLINK-9717: - [~jark] this optimization is still valid for the Blink planner, correct? > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Improvement > Components: Table SQL / Planner >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). > edit: > Currently problem is that Flink doesn't keep & restore the last previous > watermark after restoring from checkpoint and this is hard to workaround. > In other words, now we can easily "flush" one side of the join when we > receive MAX_WATERMARK, but what should happen after restoring from > checkpoint? There is no easy way to store the information that MAX_WATERMARK > was previously reached. As far as I have thought about this, it can not be > stored on the state of the Join operator and even if it could be done this > way, it's probably not the proper/elegant solution. Probably the correct > solution is to store MAX_WATERMARK in the state around watermark > emitter/source operator and the last previously emitted watermark should be > re-emitted when the job is restored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17328609#comment-17328609 ] Flink Jira Bot commented on FLINK-9717: --- This major issue is unassigned and itself and all of its Sub-Tasks have not been updated for 30 days. So, it has been labeled "stale-major". If this ticket is indeed "major", please either assign yourself or give an update. Afterwards, please remove the label. In 7 days the issue will be deprioritized. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table SQL / Legacy Planner >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > Labels: stale-major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). > edit: > Currently problem is that Flink doesn't keep & restore the last previous > watermark after restoring from checkpoint and this is hard to workaround. > In other words, now we can easily "flush" one side of the join when we > receive MAX_WATERMARK, but what should happen after restoring from > checkpoint? There is no easy way to store the information that MAX_WATERMARK > was previously reached. As far as I have thought about this, it can not be > stored on the state of the Join operator and even if it could be done this > way, it's probably not the proper/elegant solution. Probably the correct > solution is to store MAX_WATERMARK in the state around watermark > emitter/source operator and the last previously emitted watermark should be > re-emitted when the job is restored. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732106#comment-16732106 ] Piotr Nowojski commented on FLINK-9717: --- What do you [~kisimple] mean by: > When considering about joins with bounded data, I would prefer the side input > solution in which there is no `MAX_WATERMARK` if I understand correctly. ? > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722013#comment-16722013 ] boshu Zheng commented on FLINK-9717: Just a little thought on it to share. When considering about joins with bounded data, I would prefer the side input solution which there is no `MAX_WATERMARK` if I understand correctly. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16722001#comment-16722001 ] boshu Zheng commented on FLINK-9717: Hi [~pnowojski], thanks for your comment, I haven't dug into this issue yet, feel free to take it over :) > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16721561#comment-16721561 ] Piotr Nowojski commented on FLINK-9717: --- [~kisimple], this ticket is not as easy as you might think (neither as simple as I have though when I was creating it) - this optimisation is relatively trivial to implement if you do not consider restoring from checkpoints/savepoints. Currently problem is that Flink doesn't keep & restore the last previous watermark after restoring from checkpoint and this is hard to workaround. In other words, now we can easily "flush" one side of the join when we receive {{MAX_WATERMARK}}, but what should happen after restoring from checkpoint? There is no easy way to store the information that {{MAX_WATERMARK}} was previously reached. As far as I have thought about this, it can not be stored on the state of the Join operator and even if it could be done this way, it's probably not the proper/elegant solution. Probably the correct solution is to store {{MAX_WATERMARK}} in the state around watermark emitter/source operator and the last previously emitted watermark should be re-emitted when the job is restored. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Assignee: boshu Zheng >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9717) Flush state of one side of the join if other side is bounded
[ https://issues.apache.org/jira/browse/FLINK-9717?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16531172#comment-16531172 ] Fabian Hueske commented on FLINK-9717: -- There is another aspect to consider when improving the support for joins with bounded input. We might want to ensure that the operator emits an append-only stream (and that the planner is also aware of this). For example, we might need a dedicated operator to enable append-only record emission for outer joins. > Flush state of one side of the join if other side is bounded > > > Key: FLINK-9717 > URL: https://issues.apache.org/jira/browse/FLINK-9717 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.5.0 >Reporter: Piotr Nowojski >Priority: Major > > Whenever one side of join receives {{MAX_WATERMARK}}, other side in joins > (both normal and versioned joins) could flush the state from other side. > This highly useful optimisation that would speed up versioned joins and would > allow normal joins of large unbounded streams with bounded tables (for > example some static data). -- This message was sent by Atlassian JIRA (v7.6.3#76005)