Thanks Jark for the update. However, getting back to the original question, can I use a nested column directly for CREATE TABLE PARTITIONED BY like below without declaring an additional column?
CREATE TABLE output > PARTITIONED BY (`location.transId`) > WITH ( > 'connector' = 'filesystem', > 'path' = 'east-out', > 'format' = 'json' > ) LIKE navi (EXCLUDING ALL) > I tried (`location`.transId) as well but it fails with an exception: > Exception in thread "main" org.apache.flink.table.api.SqlParserException: > SQL parse failed. Encountered "." at line 3, column 27. > Was expecting one of: > ")" ... > "," ... > > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:56) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:76) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) > at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28) > Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered > "." at line 3, column 27. > Was expecting one of: > ")" ... > "," ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:416) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:201) > at > org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:148) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:163) > at org.apache.calcite.sql.parser.SqlParser.parseStmt(SqlParser.java:188) > at > org.apache.flink.table.planner.calcite.CalciteParser.parse(CalciteParser.java:54) > ... 3 more > Caused by: org.apache.flink.sql.parser.impl.ParseException: Encountered > "." at line 3, column 27. > Was expecting one of: > ")" ... > "," ... > > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.generateParseException(FlinkSqlParserImpl.java:36086) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.jj_consume_token(FlinkSqlParserImpl.java:35900) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.ParenthesizedSimpleIdentifierList(FlinkSqlParserImpl.java:21398) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateTable(FlinkSqlParserImpl.java:5292) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreateExtended(FlinkSqlParserImpl.java:6269) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlCreate(FlinkSqlParserImpl.java:19047) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmt(FlinkSqlParserImpl.java:3308) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.SqlStmtEof(FlinkSqlParserImpl.java:3800) > at > org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.parseSqlStmtEof(FlinkSqlParserImpl.java:248) > at org.apache.calcite.sql.parser.SqlParser.parseQuery(SqlParser.java:161) > ... 5 more Best, Dongwon On Wed, Jul 22, 2020 at 12:09 AM Jark Wu <imj...@gmail.com> wrote: > Hi Dongwon, > > I think this is a bug in the Filesystem connector which doesn't exclude > the computed columns when building the TableSource. > I created an issue [1] to track this problem. > > Best, > Jark > > [1]: https://issues.apache.org/jira/browse/FLINK-18665 > > On Tue, 21 Jul 2020 at 17:31, Dongwon Kim <eastcirc...@gmail.com> wrote: > >> Hi Danny, >> >> Which version did you use >> >> I use Flink 1.11.0. >> >> >>> what SQL context throws the error ? >> >> I think the declaration itself is not a problem. >> The exception occurs when I tried to execute the following which I didn't >> show you in the previous email: >> >>> tEnv.sqlQuery("SELECT type, location FROM navi").executeInsert("output") >> >> >> Thanks, >> >> Dongwon >> >> On Tue, Jul 21, 2020 at 6:16 PM Danny Chan <yuzhao....@gmail.com> wrote: >> >>> Hi, I execute the sql below >>> >>> """ >>> |create table navi ( >>> | a STRING, >>> | location ROW<lastUpdateTime BIGINT, transId STRING> >>> |) with ( >>> | 'connector' = 'filesystem', >>> | 'path' = 'east-out', >>> | 'format' = 'json' >>> |) >>> |""".stripMargin >>> tableEnv.executeSql(sql0) >>> val sql = >>> """ >>> |CREATE TABLE output ( >>> | `partition` AS location.transId >>> |) PARTITIONED BY (`partition`) >>> |WITH ( >>> | 'connector' = 'filesystem', >>> | 'path' = 'east-out', >>> | 'format' = 'json' >>> |) LIKE navi (EXCLUDING ALL) >>> |""".stripMargin >>> tableEnv.executeSql(sql) >>> >>> >>> In master branch, both are correct, can you share you stack trace detail >>> ? Which version did you use and what SQL context throws the error ? >>> >>> Best, >>> Danny Chan >>> 在 2020年7月21日 +0800 PM4:55,Dongwon Kim <eastcirc...@gmail.com>,写道: >>> >>> Hi, >>> >>> I want to create subdirectories named after values of a nested column, >>> location.transId. >>> >>> This is my first attempt: >>> >>>> CREATE TABLE output >>>> PARTITIONED BY (`location.transId`) >>>> WITH ( >>>> 'connector' = 'filesystem', >>>> 'path' = 'east-out', >>>> 'format' = 'json' >>>> ) LIKE navi (EXCLUDING ALL) >>>> >>> >>> It fails with the following errors: >>> >>>> Exception in thread "main" >>>> org.apache.flink.table.api.ValidationException: Partition column >>>> 'location.transId' not defined in the table schema. Available columns: >>>> ['type', 'location'] >>>> at >>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.verifyPartitioningColumnsExist(SqlCreateTableConverter.java:164) >>>> at >>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.createCatalogTable(SqlCreateTableConverter.java:130) >>>> at >>>> org.apache.flink.table.planner.operations.SqlCreateTableConverter.convertCreateTable(SqlCreateTableConverter.java:76) >>>> at >>>> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:190) >>>> at >>>> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) >>>> at >>>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:678) >>>> at com.kakaomobility.SQLExecutor.main(SQLExecutor.java:28) >>>> >>> >>> As It seems like nested columns are not recognized as a eligible column >>> for PARTITIONED BY, I tried the following: >>> >>>> CREATE TABLE output ( >>>> `partition` AS location.transId >>>> ) PARTITIONED BY (`partition`) >>>> WITH ( >>>> 'connector' = 'filesystem', >>>> 'path' = 'east-out', >>>> 'format' = 'json' >>>> ) LIKE navi (EXCLUDING ALL) >>>> >>> It also fails: >>> >>>> Exception in thread "main" >>>> org.apache.flink.table.api.ValidationException: The field count of logical >>>> schema of the table does not match with the field count of physical schema >>> >>> . The logical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` >>> STRING>] >>> The physical schema: [STRING,ROW<`lastUpdateTime` BIGINT, `transId` >>> STRING>,STRING]. >>> >>> Thanks in advance, >>> >>> Dongwon >>> >>>