[ 
https://issues.apache.org/jira/browse/FLINK-22082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17312868#comment-17312868
 ] 

Dian Fu commented on FLINK-22082:
---------------------------------

cc [~fsk119]

> Nested projection push down doesn't work for data: row(array(row))
> ------------------------------------------------------------------
>
>                 Key: FLINK-22082
>                 URL: https://issues.apache.org/jira/browse/FLINK-22082
>             Project: Flink
>          Issue Type: Bug
>          Components: Table SQL / Planner
>    Affects Versions: 1.12.0, 1.13.0
>            Reporter: Dian Fu
>            Priority: Major
>
> For the following job:
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import TableConfig, StreamTableEnvironment
> config = TableConfig()
> env = StreamExecutionEnvironment.get_execution_environment()
> t_env = StreamTableEnvironment.create(env, config)
> source_ddl = """
>     CREATE TABLE InTable (
>         `ID` STRING,
>         `Timestamp` TIMESTAMP(3),
>         `Result` ROW(
>             `data` ROW(`value` BIGINT) ARRAY),
>         WATERMARK FOR `Timestamp` AS `Timestamp`
>     ) WITH (
>         'connector' = 'filesystem',
>         'format' = 'json',
>         'path' = '/tmp/1.txt'
>     )
> """
> sink_ddl = """
>     CREATE TABLE OutTable (
>         `ID` STRING,
>         `value` BIGINT
>     ) WITH (
>         'connector' = 'print'
>     )
> """
> t_env.execute_sql(source_ddl)
> t_env.execute_sql(sink_ddl)
> table = t_env.from_path('InTable')
> table \
>     .select(
>         table.ID,
>         table.Result.data.at(1).value) \
>     .execute_insert('OutTable') \
>     .wait()
> {code}
> It will thrown the following exception:
> {code}
> : scala.MatchError: ITEM($2.data, 1) (of class org.apache.calcite.rex.RexCall)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.internalVisit$1(NestedProjectionUtil.scala:273)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:283)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedSchemaExtractor.visitFieldAccess(NestedProjectionUtil.scala:269)
>       at org.apache.calcite.rex.RexFieldAccess.accept(RexFieldAccess.java:92)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:112)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$$anonfun$build$1.apply(NestedProjectionUtil.scala:111)
>       at scala.collection.Iterator$class.foreach(Iterator.scala:891)
>       at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
>       at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>       at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil$.build(NestedProjectionUtil.scala:111)
>       at 
> org.apache.flink.table.planner.plan.utils.NestedProjectionUtil.build(NestedProjectionUtil.scala)
>       at 
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.getUsedFieldsInTopLevelProjectAndWatermarkAssigner(ProjectWatermarkAssignerTransposeRule.java:155)
>       at 
> org.apache.flink.table.planner.plan.rules.logical.ProjectWatermarkAssignerTransposeRule.matches(ProjectWatermarkAssignerTransposeRule.java:65)
> {code}
> See 
> https://stackoverflow.com/questions/66888486/pyflink-extract-nested-fields-from-json-array
>  for more details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to