[ https://issues.apache.org/jira/browse/FLINK-18076?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Dawid Wysakowicz closed FLINK-18076. ------------------------------------ Resolution: Fixed Fixed in: master: 61e6f70dba3e724c479b8cd7753b314ebb1d5517 1.11: e7902bb4d1329833870ee53c782c6431cfc8cb80 > Sql client uses wrong class loader when parsing queries > ------------------------------------------------------- > > Key: FLINK-18076 > URL: https://issues.apache.org/jira/browse/FLINK-18076 > Project: Flink > Issue Type: Bug > Components: Table SQL / Client > Affects Versions: 1.11.0, 1.12.0 > Reporter: Dawid Wysakowicz > Assignee: Leonard Xu > Priority: Blocker > Labels: pull-request-available > Fix For: 1.11.0 > > > Sql-client when parsing queries does not use the user class loader from > {{ExecutionContext}}. This makes it impossible to query any sources if the > dependencies are added with {{-j}} flag. > In order to reproduce it try querying e.g. KafkaDynamicSource with > {code} > CREATE TABLE MyUserTable ( > f0 BIGINT > ) WITH ( > 'connector' = 'kafka', > 'topic' = 'topic_name', -- required: topic name from which the table is read > -- required: specify the Kafka server connection string > 'properties.bootstrap.servers' = 'localhost:9092', > -- required for Kafka source, optional for Kafka sink, specify consumer > group > 'properties.group.id' = 'testGroup', > -- optional: valid modes are "earliest-offset", "latest-offset", > "group-offsets", "specific-offsets" or "timestamp" > 'scan.startup.mode' = 'earliest-offset', > 'format' = 'avro' > ); > SELECT * FROM MyUserTable; > {code} > It give exception: > {code} > Exception in thread "main" org.apache.flink.table.client.SqlClientException: > Unexpected exception. This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:213) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Invalidate SQL statement. > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:95) > at > org.apache.flink.table.client.cli.SqlCommandParser.parse(SqlCommandParser.java:79) > at > org.apache.flink.table.client.cli.CliClient.parseCommand(CliClient.java:256) > at org.apache.flink.table.client.cli.CliClient.open(CliClient.java:212) > at org.apache.flink.table.client.SqlClient.openCli(SqlClient.java:142) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:114) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:201) > Caused by: org.apache.flink.table.api.ValidationException: Unable to create a > source for reading table 'default_catalog.default_database.MyUserTable'. > Table options are: > 'connector'='kafka' > 'format'='avro' > 'properties.bootstrap.servers'='localhost:9092' > 'properties.group.id'='testGroup' > 'scan.startup.mode'='earliest-offset' > 'topic'='topic_name' > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:125) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:773) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:745) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:66) > at > org.apache.flink.table.client.cli.SqlCommandParser.parseBySqlParser(SqlCommandParser.java:90) > ... 6 more > Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a > connector using option ''connector'='kafka''. > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329) > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > ... 23 more > Caused by: org.apache.flink.table.api.ValidationException: Could not find any > factory for identifier 'kafka' that implements > 'org.apache.flink.table.factories.DynamicTableSourceFactory' in the classpath. > Available factory identifiers are: > datagen > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:240) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > ... 24 more > Shutting down the session... > done. > {code} > Because the factories are present only in the user classloader. -- This message was sent by Atlassian Jira (v8.3.4#803005)