[ https://issues.apache.org/jira/browse/FLINK-29837?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Eric Xiao updated FLINK-29837: ------------------------------ Description: When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of in the table schema of print results but it is not available to be used in a the SQL API: {code:java} val tableEnv = StreamTableEnvironment.create(env) val dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)), Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT)) // interpret the DataStream as a Table val table = tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert()) // register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table) tableEnv .sqlQuery("SELECT * FROM InputTable where op = '+I'") .execute() .print() {code} The error logs. {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675){code} It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs. [1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] was: When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of in the table schema of print results but it is not available to be used in a the SQL API: {code:java} val tableEnv = StreamTableEnvironment.create(env) val dataStream = env.fromElements( Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)), Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)), Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100)) )(Types.ROW(Types.STRING, Types.INT)) // interpret the DataStream as a Table val table = tableEnv.fromChangelogStream(dataStream, Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert()) // register the table under a name and perform an aggregation tableEnv.createTemporaryView("InputTable", table) tableEnv .sqlQuery("SELECT * FROM InputTable where op = '+I'") .execute() .print() {code} The error logs. {code:java} Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 32 to line 1, column 33: Column 'op' not found in any table at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675) at com.shopify.trickle.pipelines.IteratorPipeline$.delayedEndpoint$com$shopify$trickle$pipelines$IteratorPipeline$1(IteratorPipeline.scala:32) at com.shopify.trickle.pipelines.IteratorPipeline$delayedInit$body.apply(IteratorPipeline.scala:11) at scala.Function0.apply$mcV$sp(Function0.scala:39) at scala.Function0.apply$mcV$sp$(Function0.scala:39) at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:17) at scala.App.$anonfun$main$1$adapted(App.scala:80) at scala.collection.immutable.List.foreach(List.scala:431) at scala.App.main(App.scala:80) at scala.App.main$(App.scala:78) at com.shopify.trickle.pipelines.IteratorPipeline$.main(IteratorPipeline.scala:11) at com.shopify.trickle.pipelines.IteratorPipeline.main(IteratorPipeline.scala) {code} It would be nice to expose the `op` column to be usable in the Flink SQL APIs as it is in the DataStream APIs. [1] [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] > SQL API does not expose the RowKind of the Row for processing Changelogs > ------------------------------------------------------------------------ > > Key: FLINK-29837 > URL: https://issues.apache.org/jira/browse/FLINK-29837 > Project: Flink > Issue Type: Bug > Components: Table SQL / API > Affects Versions: 1.16.0 > Reporter: Eric Xiao > Priority: Major > > When working with `{{{}ChangeLog{}}}` data in the SQL API it was a bit > misleading to see that the `{{{}op{}}}` column appears{^}[1]{^} the type of > in the table schema of print results but it is not available to be used in a > the SQL API: > {code:java} > val tableEnv = StreamTableEnvironment.create(env) > val dataStream = env.fromElements( > Row.ofKind(RowKind.INSERT, "Alice", Int.box(12)), > Row.ofKind(RowKind.INSERT, "Bob", Int.box(5)), > Row.ofKind(RowKind.UPDATE_AFTER, "Alice", Int.box(100)) > )(Types.ROW(Types.STRING, Types.INT)) > // interpret the DataStream as a Table > val table = > tableEnv.fromChangelogStream(dataStream, > Schema.newBuilder().primaryKey("f0").build(), ChangelogMode.upsert()) > // register the table under a name and perform an aggregation > tableEnv.createTemporaryView("InputTable", table) > tableEnv > .sqlQuery("SELECT * FROM InputTable where op = '+I'") > .execute() > .print() {code} > The error logs. > > > {code:java} > Exception in thread "main" org.apache.flink.table.api.ValidationException: > SQL validation failed. From line 1, column 32 to line 1, column 33: Column > 'op' not found in any table > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:184) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:109) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:237) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:105) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:675){code} > It would be nice to expose the `op` column to be usable in the Flink SQL APIs > as it is in the DataStream APIs. > [1] > [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-fromchangelogstream] > -- This message was sent by Atlassian Jira (v8.20.10#820010)