liming30 opened a new issue, #731:
URL: https://github.com/apache/incubator-paimon/issues/731

   ### Search before asking
   
   - [X] I searched in the 
[issues](https://github.com/apache/incubator-paimon/issues) and found nothing 
similar.
   
   
   ### Paimon version
   
   0.4
   
   ### Compute Engine
   
   flink-1.16
   
   ### Minimal reproduce step
   
   Execute the following sql in sql client:
   ```
   CREATE CATALOG ts_catalog WITH (
       'type' = 'paimon',
       'warehouse' = '/test-data/e1d13ddf-7423-4075-91b2-ca5f7a18b9fd.store'
   );
   USE CATALOG ts_catalog;
   CREATE TABLE IF NOT EXISTS ts_table (
       k VARCHAR,
       v INT,
       PRIMARY KEY (k) NOT ENFORCED
   ) WITH (
       'bucket' = '2',
       'log.consistency' = 'eventual',
       'log.system' = 'kafka',
       'kafka.bootstrap.servers' = 'kafka:9092',
       'scan.mode' = 'from-snapshot',
       'scan.snapshot-id' = '1',
       'kafka.topic' = 'ts-topic-b6a8e1c8-003e-4ef9-a5cc-16979a0ce56b'
   );
   
   INSERT INTO result1 SELECT * FROM ts_table;
   ```
   
   ### What doesn't meet your expectations?
   
   Creating a Table with timeTravel semantics should not report an error. But I 
get the following exception:
   ```
   Could not execute SQL statement.
   org.apache.flink.table.client.gateway.SqlExecutionException: Failed to parse 
statement: SELECT * FROM ts_table
   ;
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:174)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.SqlCommandParserImpl.parseCommand(SqlCommandParserImpl.java:45)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.SqlMultiLineParser.parse(SqlMultiLineParser.java:71)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl.acceptLine(LineReaderImpl.java:2964) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl$1.apply(LineReaderImpl.java:3778) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.jline.reader.impl.LineReaderImpl.readLine(LineReaderImpl.java:679) 
~[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.getAndExecuteStatements(CliClient.java:295)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.executeInteractive(CliClient.java:280)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.cli.CliClient.executeInInteractiveMode(CliClient.java:228)
 [flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:151) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.start(SqlClient.java:95) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.client.SqlClient.startClient(SqlClient.java:187) 
[flink-sql-client-1.16.1.jar:1.16.1]
        at org.apache.flink.table.client.SqlClient.main(SqlClient.java:161) 
[flink-sql-client-1.16.1.jar:1.16.1]
   Caused by: org.apache.flink.table.api.ValidationException: SQL validation 
failed. Can not create a Path from a null string
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:186)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
 ~[?:?]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
 ~[?:?]
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
~[?:?]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        ... 12 more
   Caused by: java.lang.IllegalArgumentException: Can not create a Path from a 
null string
        at org.apache.paimon.fs.Path.checkPathArg(Path.java:128) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at org.apache.paimon.fs.Path.<init>(Path.java:142) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at org.apache.paimon.CoreOptions.path(CoreOptions.java:564) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at org.apache.paimon.CoreOptions.path(CoreOptions.java:560) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.AbstractFileStore.snapshotManager(AbstractFileStore.java:73) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.table.AbstractFileStoreTable.snapshotManager(AbstractFileStoreTable.java:178)
 ~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.table.AbstractFileStoreTable.tryTimeTravel(AbstractFileStoreTable.java:196)
 ~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.table.AbstractFileStoreTable.copy(AbstractFileStoreTable.java:132)
 ~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.table.FileStoreTableFactory.create(FileStoreTableFactory.java:86)
 ~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.table.FileStoreTableFactory.create(FileStoreTableFactory.java:69)
 ~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.catalog.AbstractCatalog.getDataTable(AbstractCatalog.java:62) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.paimon.catalog.AbstractCatalog.getTable(AbstractCatalog.java:56) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at org.apache.paimon.flink.FlinkCatalog.getTable(FlinkCatalog.java:163) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at org.apache.paimon.flink.FlinkCatalog.getTable(FlinkCatalog.java:71) 
~[paimon-flink.jar:0.4-SNAPSHOT]
        at 
org.apache.flink.table.catalog.CatalogManager.getPermanentTable(CatalogManager.java:408)
 ~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.catalog.CatalogManager.getTable(CatalogManager.java:364) 
~[flink-table-api-java-uber-1.16.1.jar:1.16.1]
        at 
org.apache.flink.table.planner.catalog.DatabaseCalciteSchema.getTable(DatabaseCalciteSchema.java:73)
 ~[?:?]
        at 
org.apache.calcite.jdbc.SimpleCalciteSchema.getImplicitTable(SimpleCalciteSchema.java:83)
 ~[?:?]
        at 
org.apache.calcite.jdbc.CalciteSchema.getTable(CalciteSchema.java:289) ~[?:?]
        at 
org.apache.calcite.sql.validate.EmptyScope.resolve_(EmptyScope.java:143) ~[?:?]
        at 
org.apache.calcite.sql.validate.EmptyScope.resolveTable(EmptyScope.java:99) 
~[?:?]
        at 
org.apache.calcite.sql.validate.DelegatingScope.resolveTable(DelegatingScope.java:203)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:112)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3085)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3070)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3335)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:997)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:975)
 ~[?:?]
        at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232) ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:952)
 ~[?:?]
        at 
org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:704)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:182)
 ~[?:?]
        at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:113)
 ~[?:?]
        at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:261)
 ~[?:?]
        at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:106) 
~[?:?]
        at 
org.apache.flink.table.client.gateway.local.LocalExecutor.parseStatement(LocalExecutor.java:172)
 ~[flink-sql-client-1.16.1.jar:1.16.1]
        ... 12 more
   ```
   
   ### Anything else?
   
   The reason for the exception is that the table with timeTravel semantics 
needs to check the snapshot. 
https://github.com/apache/incubator-paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/table/AbstractFileStoreTable.java#L132
 
   
   `AbstractFileStoreTable#store` will generator the options from the `schema` 
of the table, which does not contain the `PATH` configuration, so this 
exception occurs.
   
   At the same time, I found that in many unit tests, if the `scan.mode` of the 
table is configured as the type of timeTravel, the exception will occur. For 
example `FlinkCatalogTest#testCreateTable_Streaming`
   
   Should we always put the `PATH` configuration in the table schema?
   
   ### Are you willing to submit a PR?
   
   - [X] I'm willing to submit a PR!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@paimon.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to