[ https://issues.apache.org/jira/browse/FLINK-15552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17014069#comment-17014069 ]
Jark Wu commented on FLINK-15552: --------------------------------- Hi [~Terry1897], are you sure there is kafka sql jar under {{/xx/connectors/kafka/}} ? Because AFAIK, SQL CLI e2e tests also uses --library and --jars to test. > SQL Client can not correctly create kafka table using --library to indicate a > kafka connector directory > ------------------------------------------------------------------------------------------------------- > > Key: FLINK-15552 > URL: https://issues.apache.org/jira/browse/FLINK-15552 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client, Table SQL / Runtime > Reporter: Terry Wang > Priority: Major > > How to Reproduce: > first, I start a sql client and using `-l` to point to a kafka connector > directory. > ` > bin/sql-client.sh embedded -l /xx/connectors/kafka/ > ` > Then, I create a Kafka Table like following > ` > Flink SQL> CREATE TABLE MyUserTable ( > > content String > > ) WITH ( > > 'connector.type' = 'kafka', > > 'connector.version' = 'universal', > > 'connector.topic' = 'test', > > 'connector.properties.zookeeper.connect' = 'localhost:2181', > > 'connector.properties.bootstrap.servers' = 'localhost:9092', > > 'connector.properties.group.id' = 'testGroup', > > 'connector.startup-mode' = 'earliest-offset', > > 'format.type' = 'csv' > > ); > [INFO] Table has been created. > ` > Then I select from just created table and an exception been thrown: > ` > Flink SQL> select * from MyUserTable; > [ERROR] Could not execute SQL statement. Reason: > org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a > suitable table factory for > 'org.apache.flink.table.factories.TableSourceFactory' in > the classpath. > Reason: Required context properties mismatch. > The matching candidates: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > Mismatched properties: > 'connector.type' expects 'filesystem', but is 'kafka' > The following properties are requested: > connector.properties.bootstrap.servers=localhost:9092 > connector.properties.group.id=testGroup > connector.properties.zookeeper.connect=localhost:2181 > connector.startup-mode=earliest-offset > connector.topic=test > connector.type=kafka > connector.version=universal > format.type=csv > schema.0.data-type=VARCHAR(2147483647) > schema.0.name=content > The following factories have been considered: > org.apache.flink.table.sources.CsvBatchTableSourceFactory > org.apache.flink.table.sources.CsvAppendTableSourceFactory > ` > Potential Reasons: > Now we use `TableFactoryUtil#findAndCreateTableSource` to convert a > CatalogTable to TableSource, but when call `TableFactoryService.find` we > don't pass current classLoader to this method, the default loader will be > BootStrapClassLoader, which can not find our factory. > I verified in my box, it's truly caused by this behavior. -- This message was sent by Atlassian Jira (v8.3.4#803005)