Terry Wang created FLINK-15552: ---------------------------------- Summary: SQL Client can not correctly create kafka table using --library to indicate a kafka connector directory Key: FLINK-15552 URL: https://issues.apache.org/jira/browse/FLINK-15552 Project: Flink Issue Type: Bug Components: Table SQL / Client, Table SQL / Runtime Reporter: Terry Wang
How to Reproduce: first, I start a sql client and using `-l` to point to a kafka connector directory. ` bin/sql-client.sh embedded -l /xx/connectors/kafka/ ` Then, I create a Kafka Table like following ` Flink SQL> CREATE TABLE MyUserTable ( > content String > ) WITH ( > 'connector.type' = 'kafka', > 'connector.version' = 'universal', > 'connector.topic' = 'test', > 'connector.properties.zookeeper.connect' = 'localhost:2181', > 'connector.properties.bootstrap.servers' = 'localhost:9092', > 'connector.properties.group.id' = 'testGroup', > 'connector.startup-mode' = 'earliest-offset', > 'format.type' = 'csv' > ); [INFO] Table has been created. ` Then I select from just created table and an exception been thrown: ` Flink SQL> select * from MyUserTable; [ERROR] Could not execute SQL statement. Reason: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSourceFactory' in the classpath. Reason: Required context properties mismatch. The matching candidates: org.apache.flink.table.sources.CsvBatchTableSourceFactory Mismatched properties: 'connector.type' expects 'filesystem', but is 'kafka' The following properties are requested: connector.properties.bootstrap.servers=localhost:9092 connector.properties.group.id=testGroup connector.properties.zookeeper.connect=localhost:2181 connector.startup-mode=earliest-offset connector.topic=test connector.type=kafka connector.version=universal format.type=csv schema.0.data-type=VARCHAR(2147483647) schema.0.name=content The following factories have been considered: org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory ` Potential Reasons: Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a CatalogTable to TableSource, but when call `TableFactoryService.find` we don't pass current classLoader to the this method, the defualt loader will be BootStrapClassLoader, which can not find our factory. -- This message was sent by Atlassian Jira (v8.3.4#803005)