[jira] [Commented] (FLINK-20255) Nested decorrelate failed

2023-08-08 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751959#comment-17751959
 ] 

Yunhong Zheng commented on FLINK-20255:
---

Hi, [~jark] . After reading source code. I think this is not a bug cause by 
Flink, Could you help to close this issue or set it to feature?

> Nested decorrelate failed
> -
>
> Key: FLINK-20255
> URL: https://issues.apache.org/jira/browse/FLINK-20255
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0, 1.12.0
>Reporter: godfrey he
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> This issue is from ML 
> https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW"))
> class SplitStringToRows extends TableFunction[Row] {
>   def eval(str: String, separator: String = ";"): Unit = {
> if (str != null) {
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
> }
>   }
> }
> object Job {
>   def main(args: Array[String]): Unit = {
> val settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
> streamTableEnv.createTemporarySystemFunction(
>   "SplitStringToRows",
>   classOf[SplitStringToRows]
> ) // Class defined in previous email
> streamTableEnv.executeSql(
>   """
>   CREATE TABLE table2 (
> attr1 STRING,
> attr2 STRING,
> attr3 DECIMAL,
> attr4 DATE
>   ) WITH (
>'connector' = 'datagen'
>)""")
> val q2 = streamTableEnv.sqlQuery(
>   """
> SELECT
>   a.attr1 AS attr1,
>   attr2,
>   attr3,
>   attr4
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
> a(attr1)
> """)
> streamTableEnv.createTemporaryView("view2", q2)
> val q3 =
>   """
> SELECT
>   w.attr1,
>   p.attr3
> FROM table2 w
> LEFT JOIN LATERAL (
>   SELECT
> attr1,
> attr3
>   FROM (
> SELECT
>   attr1,
>   attr3,
>   ROW_NUMBER() OVER (
> PARTITION BY attr1
> ORDER BY
>   attr4 DESC NULLS LAST,
>   w.attr2 = attr2 DESC NULLS LAST
>   ) AS row_num
>   FROM view2)
>   WHERE row_num = 1) p
> ON (w.attr1 = p.attr1)
> """
> println(streamTableEnv.explainSql(q3))
>   }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
> decorrelate pattern now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20255) Nested decorrelate failed

2023-08-08 Thread Yunhong Zheng (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17751957#comment-17751957
 ] 

Yunhong Zheng commented on FLINK-20255:
---

Hi [~lam167] . By design, your example cannot work. Now, 'UNNEST(xx) AS xx (xx) 
ON' can only with condition equals 'TRUE', like 'UNNEST(relatedUserIds) AS t 
(relatedUserId) ON TRUE'. Calcite will translate UNNEST as 'Correlate', which 
you can treat it as a nested loop join, so you cannot give it a not always true 
condition.

For your sql example, you can convert it into a filter condition and add it in 
where condition, such as
SELECT * 
FROM Messages 
CROSS JOIN UNNEST(relatedUserIds) AS t (relatedUserId) where userId = 
t.relatedUserId
 

> Nested decorrelate failed
> -
>
> Key: FLINK-20255
> URL: https://issues.apache.org/jira/browse/FLINK-20255
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0, 1.12.0
>Reporter: godfrey he
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> This issue is from ML 
> https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW"))
> class SplitStringToRows extends TableFunction[Row] {
>   def eval(str: String, separator: String = ";"): Unit = {
> if (str != null) {
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
> }
>   }
> }
> object Job {
>   def main(args: Array[String]): Unit = {
> val settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
> streamTableEnv.createTemporarySystemFunction(
>   "SplitStringToRows",
>   classOf[SplitStringToRows]
> ) // Class defined in previous email
> streamTableEnv.executeSql(
>   """
>   CREATE TABLE table2 (
> attr1 STRING,
> attr2 STRING,
> attr3 DECIMAL,
> attr4 DATE
>   ) WITH (
>'connector' = 'datagen'
>)""")
> val q2 = streamTableEnv.sqlQuery(
>   """
> SELECT
>   a.attr1 AS attr1,
>   attr2,
>   attr3,
>   attr4
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
> a(attr1)
> """)
> streamTableEnv.createTemporaryView("view2", q2)
> val q3 =
>   """
> SELECT
>   w.attr1,
>   p.attr3
> FROM table2 w
> LEFT JOIN LATERAL (
>   SELECT
> attr1,
> attr3
>   FROM (
> SELECT
>   attr1,
>   attr3,
>   ROW_NUMBER() OVER (
> PARTITION BY attr1
> ORDER BY
>   attr4 DESC NULLS LAST,
>   w.attr2 = attr2 DESC NULLS LAST
>   ) AS row_num
>   FROM view2)
>   WHERE row_num = 1) p
> ON (w.attr1 = p.attr1)
> """
> println(streamTableEnv.explainSql(q3))
>   }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
> decorrelate pattern now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20255) Nested decorrelate failed

2023-02-06 Thread Jianhui Dong (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17684532#comment-17684532
 ] 

Jianhui Dong commented on FLINK-20255:
--

Is there anyone else following up on this issue?

I found another simple example that can not work correctly.

 
{code:java}
/*
SELECT *
FROM Messages
LEFT JOIN UNNEST(relatedUserIds) AS t (relatedUserId)
ON
userId = t.relatedUserId
*/ 

or

/* SELECT * FROM Messages JOIN UNNEST(relatedUserIds) AS t (relatedUserId) ON 
userId = t.relatedUserId */ {code}
 

 
 

> Nested decorrelate failed
> -
>
> Key: FLINK-20255
> URL: https://issues.apache.org/jira/browse/FLINK-20255
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / Planner
>Affects Versions: 1.11.0, 1.12.0
>Reporter: godfrey he
>Priority: Not a Priority
>  Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> This issue is from ML 
> https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW"))
> class SplitStringToRows extends TableFunction[Row] {
>   def eval(str: String, separator: String = ";"): Unit = {
> if (str != null) {
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
> }
>   }
> }
> object Job {
>   def main(args: Array[String]): Unit = {
> val settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
> streamTableEnv.createTemporarySystemFunction(
>   "SplitStringToRows",
>   classOf[SplitStringToRows]
> ) // Class defined in previous email
> streamTableEnv.executeSql(
>   """
>   CREATE TABLE table2 (
> attr1 STRING,
> attr2 STRING,
> attr3 DECIMAL,
> attr4 DATE
>   ) WITH (
>'connector' = 'datagen'
>)""")
> val q2 = streamTableEnv.sqlQuery(
>   """
> SELECT
>   a.attr1 AS attr1,
>   attr2,
>   attr3,
>   attr4
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
> a(attr1)
> """)
> streamTableEnv.createTemporaryView("view2", q2)
> val q3 =
>   """
> SELECT
>   w.attr1,
>   p.attr3
> FROM table2 w
> LEFT JOIN LATERAL (
>   SELECT
> attr1,
> attr3
>   FROM (
> SELECT
>   attr1,
>   attr3,
>   ROW_NUMBER() OVER (
> PARTITION BY attr1
> ORDER BY
>   attr4 DESC NULLS LAST,
>   w.attr2 = attr2 DESC NULLS LAST
>   ) AS row_num
>   FROM view2)
>   WHERE row_num = 1) p
> ON (w.attr1 = p.attr1)
> """
> println(streamTableEnv.explainSql(q3))
>   }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
> decorrelate pattern now



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (FLINK-20255) Nested decorrelate failed

2022-04-19 Thread Nico Kruber (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17524261#comment-17524261
 ] 

Nico Kruber commented on FLINK-20255:
-

The following example (which was simplified from a more complex join that makes 
more sense than this version) also seems to be an incarnation of the described 
problem (tested in Flink 1.14.3):

{code}
CREATE TEMPORARY TABLE Messages (
  `id` CHAR(1),
  `userId` TINYINT,
  `relatedUserIds` ARRAY
)
WITH (
  'connector' = 'datagen',
  'fields.id.length' = '10',
  'fields.userId.kind' = 'random',
  'fields.userId.min' = '1',
  'fields.userId.max' = '10',
  'fields.relatedUserIds.kind' = 'random',
  'fields.relatedUserIds.element.min' = '1',
  'fields.relatedUserIds.element.max' = '10',
  'rows-per-second' = '1000'
);

-- the non-working version:
SELECT *
FROM Messages outer_message
WHERE
outer_message.userId IN
(
SELECT relatedUserId
FROM Messages inner_message
CROSS JOIN UNNEST(inner_message.relatedUserIds) AS t (relatedUserId)
WHERE inner_message.id = outer_message.id
)

-- this one is working:
/*
SELECT *
FROM Messages
CROSS JOIN UNNEST(relatedUserIds) AS t (relatedUserId)
WHERE
userId = t.relatedUserId
*/
{code}

It produces the following exception:
{code}
org.apache.flink.table.api.TableException: unexpected correlate variable $cor1 
in the plan
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.checkCorrelVariableExists(FlinkDecorrelateProgram.scala:57)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkDecorrelateProgram.optimize(FlinkDecorrelateProgram.scala:42)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:63)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1$$anonfun$apply$1.apply(FlinkGroupProgram.scala:60)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.Iterator$class.foreach(Iterator.scala:891) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.IterableLike$class.foreach(IterableLike.scala:72) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractIterable.foreach(Iterable.scala:54) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:60)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram$$anonfun$optimize$1.apply(FlinkGroupProgram.scala:55)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.immutable.Range.foreach(Range.scala:160) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.AbstractTraversable.foldLeft(Traversable.scala:104) 
~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkGroupProgram.optimize(FlinkGroupProgram.scala:55)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:62)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
org.apache.flink.table.planner.plan.optimize.program.FlinkChainedProgram$$anonfun$optimize$1.apply(FlinkChainedProgram.scala:58)
 ~[flink-table_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
 ~[flink-dist_2.11-1.14.3.jar:1.14.3]
at scala.collection.Iterat

[jira] [Commented] (FLINK-20255) Nested decorrelate failed

2021-04-29 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17336020#comment-17336020
 ] 

Flink Jira Bot commented on FLINK-20255:


This issue was labeled "stale-major" 7 ago and has not received any updates so 
it is being deprioritized. If this ticket is actually Major, please raise the 
priority and ask a committer to assign you the issue or revive the public 
discussion.


> Nested decorrelate failed
> -
>
> Key: FLINK-20255
> URL: https://issues.apache.org/jira/browse/FLINK-20255
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0, 1.12.0
>Reporter: godfrey he
>Priority: Major
>  Labels: stale-major
>
> This issue is from ML 
> https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW"))
> class SplitStringToRows extends TableFunction[Row] {
>   def eval(str: String, separator: String = ";"): Unit = {
> if (str != null) {
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
> }
>   }
> }
> object Job {
>   def main(args: Array[String]): Unit = {
> val settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
> streamTableEnv.createTemporarySystemFunction(
>   "SplitStringToRows",
>   classOf[SplitStringToRows]
> ) // Class defined in previous email
> streamTableEnv.executeSql(
>   """
>   CREATE TABLE table2 (
> attr1 STRING,
> attr2 STRING,
> attr3 DECIMAL,
> attr4 DATE
>   ) WITH (
>'connector' = 'datagen'
>)""")
> val q2 = streamTableEnv.sqlQuery(
>   """
> SELECT
>   a.attr1 AS attr1,
>   attr2,
>   attr3,
>   attr4
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
> a(attr1)
> """)
> streamTableEnv.createTemporaryView("view2", q2)
> val q3 =
>   """
> SELECT
>   w.attr1,
>   p.attr3
> FROM table2 w
> LEFT JOIN LATERAL (
>   SELECT
> attr1,
> attr3
>   FROM (
> SELECT
>   attr1,
>   attr3,
>   ROW_NUMBER() OVER (
> PARTITION BY attr1
> ORDER BY
>   attr4 DESC NULLS LAST,
>   w.attr2 = attr2 DESC NULLS LAST
>   ) AS row_num
>   FROM view2)
>   WHERE row_num = 1) p
> ON (w.attr1 = p.attr1)
> """
> println(streamTableEnv.explainSql(q3))
>   }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
> decorrelate pattern now



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (FLINK-20255) Nested decorrelate failed

2021-04-22 Thread Flink Jira Bot (Jira)


[ 
https://issues.apache.org/jira/browse/FLINK-20255?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17327524#comment-17327524
 ] 

Flink Jira Bot commented on FLINK-20255:


This major issue is unassigned and itself and all of its Sub-Tasks have not 
been updated for 30 days. So, it has been labeled "stale-major". If this ticket 
is indeed "major", please either assign yourself or give an update. Afterwards, 
please remove the label. In 7 days the issue will be deprioritized.

> Nested decorrelate failed
> -
>
> Key: FLINK-20255
> URL: https://issues.apache.org/jira/browse/FLINK-20255
> Project: Flink
>  Issue Type: Bug
>  Components: Table SQL / API
>Affects Versions: 1.11.0, 1.12.0
>Reporter: godfrey he
>Priority: Major
>  Labels: stale-major
>
> This issue is from ML 
> https://www.mail-archive.com/user@flink.apache.org/msg37746.html
> We can reproduce the issue through the following code
> {code:java}
> @FunctionHint(output = new DataTypeHint("ROW"))
> class SplitStringToRows extends TableFunction[Row] {
>   def eval(str: String, separator: String = ";"): Unit = {
> if (str != null) {
>   str.split(separator).foreach(s => collect(Row.of(s.trim(
> }
>   }
> }
> object Job {
>   def main(args: Array[String]): Unit = {
> val settings = 
> EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
> val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
> val streamTableEnv = StreamTableEnvironment.create(streamEnv, settings)
> streamTableEnv.createTemporarySystemFunction(
>   "SplitStringToRows",
>   classOf[SplitStringToRows]
> ) // Class defined in previous email
> streamTableEnv.executeSql(
>   """
>   CREATE TABLE table2 (
> attr1 STRING,
> attr2 STRING,
> attr3 DECIMAL,
> attr4 DATE
>   ) WITH (
>'connector' = 'datagen'
>)""")
> val q2 = streamTableEnv.sqlQuery(
>   """
> SELECT
>   a.attr1 AS attr1,
>   attr2,
>   attr3,
>   attr4
> FROM table2 p, LATERAL TABLE(SplitStringToRows(p.attr1, ';')) AS 
> a(attr1)
> """)
> streamTableEnv.createTemporaryView("view2", q2)
> val q3 =
>   """
> SELECT
>   w.attr1,
>   p.attr3
> FROM table2 w
> LEFT JOIN LATERAL (
>   SELECT
> attr1,
> attr3
>   FROM (
> SELECT
>   attr1,
>   attr3,
>   ROW_NUMBER() OVER (
> PARTITION BY attr1
> ORDER BY
>   attr4 DESC NULLS LAST,
>   w.attr2 = attr2 DESC NULLS LAST
>   ) AS row_num
>   FROM view2)
>   WHERE row_num = 1) p
> ON (w.attr1 = p.attr1)
> """
> println(streamTableEnv.explainSql(q3))
>   }
> }
> {code}
> The reason is {{RelDecorrelator}} in Calcite can't handle such nested 
> decorrelate pattern now



--
This message was sent by Atlassian Jira
(v8.3.4#803005)