Kaibo Zhou created FLINK-15419:
----------------------------------

             Summary: Validate SQL syntax not need to depend on connector jar
                 Key: FLINK-15419
                 URL: https://issues.apache.org/jira/browse/FLINK-15419
             Project: Flink
          Issue Type: Improvement
          Components: Table SQL / API
            Reporter: Kaibo Zhou
             Fix For: 1.11.0


As a platform user, I want to integrate Flink SQL in my platform.

The users will register Source/Sink Tables and Functions to catalog service 
through UI, and write SQL scripts on Web SQLEditor. I want to validate the SQL 
syntax and validate that all catalog objects exist (table, fields, UDFs). 

After some investigation, I decided to use the `tEnv.sqlUpdate/sqlQuery` API to 
do this.`SqlParser` and`FlinkSqlParserImpl` is not a good choice, as it will 
not read the catalog.

The users have registered *Kafka* source/sink table in the catalog, so the 
validation logic will be:
{code:java}
TableEnvironment tableEnv = xxxx

tEnv.registerCatalog(CATALOG_NAME, catalog);
tEnv.useCatalog(CATALOG_NAME);
tEnv.useDatabase(DB_NAME);

tEnv.sqlUpdate("INSERT INTO sinkTable SELECT f1,f2 FROM sourceTable"); 
or  
tEnv.sqlQuery("SELECT * FROM tableName")
{code}
It will through exception on Flink 1.9.0 because I do not have 
`flink-connector-kafka_2.11-1.9.0.jar`  in my classpath.
{code:java}
org.apache.flink.table.api.ValidationException: SQL validation failed. 
findAndCreateTableSource failed.org.apache.flink.table.api.ValidationException: 
SQL validation failed. findAndCreateTableSource failed. at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:125)
 at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:82)
 at 
org.apache.flink.table.planner.delegation.PlannerBase.parse(PlannerBase.scala:132)
 at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlUpdate(TableEnvironmentImpl.java:335)

The following factories have been considered:
org.apache.flink.formats.json.JsonRowFormatFactory
org.apache.flink.table.planner.delegation.BlinkPlannerFactory
org.apache.flink.table.planner.delegation.BlinkExecutorFactory
org.apache.flink.table.catalog.GenericInMemoryCatalogFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
org.apache.flink.table.sinks.CsvBatchTableSinkFactory
org.apache.flink.table.sinks.CsvAppendTableSinkFactory
        at 
org.apache.flink.table.factories.TableFactoryService.filterByContext(TableFactoryService.java:283)
        at 
org.apache.flink.table.factories.TableFactoryService.filter(TableFactoryService.java:191)
        at 
org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:144)
        at 
org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:97)
        at 
org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSource(TableFactoryUtil.java:64)
{code}
For a platform provider, the user's SQL may depend on *ANY* connector or even a 
custom connector. It is complicated to do dynamic loading connector jar after 
parser the connector type in SQL. And this requires the users must upload their 
custom connector jar before doing a syntax check.

I hope that Flink can provide a friendly way to verify the syntax of SQL whose 
tables/functions are already registered in the catalog, *NOT* need to depend on 
the jar of the connector. This makes it easier for SQL to be integrated by 
external platforms.
  

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to