Hi,

现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:

List<Factory> result = new LinkedList<>();
ServiceLoader
   .load(Factory.class, Thread.currentThread().getContextClassLoader())
   .iterator()
   .forEachRemaining(result::add);
List<Factory> jdbcResult = result.stream().filter(f ->
   DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
   f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);


> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
> 的 Connector?
> 
> 
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh" <huazhe...@qq.com> 写道:
>> 错误:
>> 
>> 
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>> factories for identifier 'jdbc' that implement 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>> classpath
>> 
>> 
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>> 
>> 
>> 代码如下:
>> package org.apache.flink.examples;
>> 
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>> 
>> 
>> public class CDC2ss2 {
>> &nbsp; &nbsp; public static void main(String[] args) throws Exception {
>> 
>> 
>> &nbsp; &nbsp; &nbsp; &nbsp; // set up execution environment
>> &nbsp; &nbsp; &nbsp; &nbsp; StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> &nbsp; &nbsp; &nbsp; &nbsp; StreamTableEnvironment tEnv;
>> 
>> 
>> &nbsp; &nbsp; &nbsp; &nbsp; EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance()
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .useBlinkPlanner()
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .inStreamingMode()
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .build();
>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv = StreamTableEnvironment.create(env, 
>> settings);
>> &nbsp; &nbsp; &nbsp; &nbsp; String src_sql = "CREATE TABLE userss (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp;user_id INT,\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp;user_nm STRING\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'connector' = 'mysql-cdc',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'hostname' = '10.12.5.37',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'port' = '3306',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'username' = 'dps',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'password' = 'dps1234',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'database-name' = 'rpt',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'table-name' = 'users'\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; )";
>> 
>> 
>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(src_sql); // 创建表
>> 
>> 
>> &nbsp; &nbsp; &nbsp; &nbsp; String sink="CREATE TABLE sink (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp;user_id INT,\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp;user_nm STRING,\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp;primary key(user_id)&nbsp; NOT ENFORCED \n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ") WITH (\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'connector' = 'jdbc',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'username' = 'dps',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'password' = 'dps1234',\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; 'table-name' = 'sink'\n" +
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; "&nbsp; &nbsp; 
>> &nbsp; )";
>> &nbsp; &nbsp; &nbsp; &nbsp; String to_print_sql="insert into sink select 
>> user_id&nbsp; ,user_nm&nbsp; &nbsp;from userss";
>> &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;tEnv.executeSql(sink);
>> &nbsp; &nbsp; &nbsp; &nbsp; tEnv.executeSql(to_print_sql);
>> &nbsp; &nbsp; &nbsp; &nbsp; env.execute();
>> &nbsp; &nbsp; }
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 详细错误:
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Unable to create a sink for writing table 
>> 'default_catalog.default_database.sink'.
>> 
>> 
>> Table options are:
>> 
>> 
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>>      at 
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>      at 
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>      at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>      at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>      at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>      at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>      at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>      at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>      at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>      at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>      at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>>      at 
>> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>>      at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>> Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
>> connector using option ''connector'='jdbc''.
>>      at 
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:329)
>>      at 
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:157)
>>      ... 18 more
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>> factories for identifier 'jdbc' that implement 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>> classpath.
>> 
>> 
>> Ambiguous factory classes are:
>> 
>> 
>> java.util.LinkedList
>> java.util.LinkedList
>> java.util.LinkedList
>> java.util.LinkedList
>>      at 
>> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:253)
>>      at 
>> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
>>      ... 19 more
>> 
>> 
>> Process finished with exit code 1

Reply via email to