Re:用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread hailongwang
Hi,
 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 的 
Connector?


Best,
Hailong
在 2020-12-03 14:44:18,"xuzh"  写道:
>错误:
>
>
>Caused by: org.apache.flink.table.api.ValidationException: Multiple factories 
>for identifier 'jdbc' that implement 
>'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>classpath
>
>
>看意思是找到了两个一样的类:DynamicTableSinkFactory
>
>
>代码如下:
>package org.apache.flink.examples;
>
>
>import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>import org.apache.flink.table.api.EnvironmentSettings;
>import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>import org.apache.flink.table.factories.DynamicTableSinkFactory;
>
>
>public class CDC2ss2 {
>    public static void main(String[] args) throws Exception {
>
>
>        // set up execution environment
>        StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>        StreamTableEnvironment tEnv;
>
>
>        EnvironmentSettings settings = 
>EnvironmentSettings.newInstance()
>                .useBlinkPlanner()
>                .inStreamingMode()
>                .build();
>        tEnv = StreamTableEnvironment.create(env, 
>settings);
>        String src_sql = "CREATE TABLE userss (\n" +
>                "    
> user_id INT,\n" +
>                "    
> user_nm STRING\n" +
>                ") WITH (\n" +
>                "      
>'connector' = 'mysql-cdc',\n" +
>                "      
>'hostname' = '10.12.5.37',\n" +
>                "      
>'port' = '3306',\n" +
>                "      
>'username' = 'dps',\n" +
>                "      
>'password' = 'dps1234',\n" +
>                "      
>'database-name' = 'rpt',\n" +
>                "      
>'table-name' = 'users'\n" +
>                "      
>)";
>
>
>        tEnv.executeSql(src_sql); // 创建表
>
>
>        String sink="CREATE TABLE sink (\n" +
>                "    
> user_id INT,\n" +
>                "    
> user_nm STRING,\n" +
>                "    
> primary key(user_id)  NOT ENFORCED \n" +
>                ") WITH (\n" +
>                "      
>'connector' = 'jdbc',\n" +
>                "      
>'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>                "      
>'username' = 'dps',\n" +
>                "      
>'password' = 'dps1234',\n" +
>                "      
>'table-name' = 'sink'\n" +
>                "      
>)";
>        String to_print_sql="insert into sink select 
>user_id  ,user_nm   from userss";
>         tEnv.executeSql(sink);
>        tEnv.executeSql(to_print_sql);
>        env.execute();
>    }
>
>
>}
>
>
>
>
>
>详细错误:
>
>
>Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>Unable to create a sink for writing table 
>'default_catalog.default_database.sink'.
>
>
>Table options are:
>
>
>'connector'='jdbc'
>'password'='dps1234'
>'table-name'='sink'
>'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>'username'='dps'
>   at 
> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>   at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>   at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>   at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>   at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>   at 
> org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:163)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1264)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:700)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:787)
>   at 
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:690)
>   at org.apache.flink.examples.CDC2ss2.main(CDC2ss2.java:50)
>Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a 
>connector using option ''connector'='jdbc''.
>   at 
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(Factor

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread Wei Zhong
Hi,

现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:

List result = new LinkedList<>();
ServiceLoader
   .load(Factory.class, Thread.currentThread().getContextClassLoader())
   .iterator()
   .forEachRemaining(result::add);
List jdbcResult = result.stream().filter(f ->
   DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
   f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
System.out.println(jdbcResult);


> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com> 写道:
> 
> Hi,
> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
> 的 Connector?
> 
> 
> Best,
> Hailong
> 在 2020-12-03 14:44:18,"xuzh"  写道:
>> 错误:
>> 
>> 
>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>> factories for identifier 'jdbc' that implement 
>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>> classpath
>> 
>> 
>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>> 
>> 
>> 代码如下:
>> package org.apache.flink.examples;
>> 
>> 
>> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>> import org.apache.flink.table.api.EnvironmentSettings;
>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>> 
>> 
>> public class CDC2ss2 {
>>     public static void main(String[] args) throws Exception {
>> 
>> 
>>         // set up execution environment
>>         StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>>         StreamTableEnvironment tEnv;
>> 
>> 
>>         EnvironmentSettings settings = 
>> EnvironmentSettings.newInstance()
>>                 .useBlinkPlanner()
>>                 .inStreamingMode()
>>                 .build();
>>         tEnv = StreamTableEnvironment.create(env, 
>> settings);
>>         String src_sql = "CREATE TABLE userss (\n" +
>>                 "    
>>  user_id INT,\n" +
>>                 "    
>>  user_nm STRING\n" +
>>                 ") WITH (\n" +
>>                 "    
>>   'connector' = 'mysql-cdc',\n" +
>>                 "    
>>   'hostname' = '10.12.5.37',\n" +
>>                 "    
>>   'port' = '3306',\n" +
>>                 "    
>>   'username' = 'dps',\n" +
>>                 "    
>>   'password' = 'dps1234',\n" +
>>                 "    
>>   'database-name' = 'rpt',\n" +
>>                 "    
>>   'table-name' = 'users'\n" +
>>                 "    
>>   )";
>> 
>> 
>>         tEnv.executeSql(src_sql); // 创建表
>> 
>> 
>>         String sink="CREATE TABLE sink (\n" +
>>                 "    
>>  user_id INT,\n" +
>>                 "    
>>  user_nm STRING,\n" +
>>                 "    
>>  primary key(user_id)  NOT ENFORCED \n" +
>>                 ") WITH (\n" +
>>                 "    
>>   'connector' = 'jdbc',\n" +
>>                 "    
>>   'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n" +
>>                 "    
>>   'username' = 'dps',\n" +
>>                 "    
>>   'password' = 'dps1234',\n" +
>>                 "    
>>   'table-name' = 'sink'\n" +
>>                 "    
>>   )";
>>         String to_print_sql="insert into sink select 
>> user_id  ,user_nm   from userss";
>>          tEnv.executeSql(sink);
>>         tEnv.executeSql(to_print_sql);
>>         env.execute();
>>     }
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 详细错误:
>> 
>> 
>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>> Unable to create a sink for writing table 
>> 'default_catalog.default_database.sink'.
>> 
>> 
>> Table options are:
>> 
>> 
>> 'connector'='jdbc'
>> 'password'='dps1234'
>> 'table-name'='sink'
>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false'
>> 'username'='dps'
>>  at 
>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at 
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>  at scala.collection.Iterator$class.foreach(Iterator.scala:893)
>>  at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
>>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
>>  at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
>>  at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
>>  at scala.collection.AbstractTraversable.map(Traversable.scala:104)
>>  at 
>> org.apac

Re: 用代码执行flink sql 报错 Multiple factories for identifier 'jdbc' that implement

2020-12-03 Thread Wei Zhong
这个错误信息显示问题在后续版本已经修复,新版本发布后升级版本就能直接从错误信息中看到是哪些TableFactory冲突了:

https://issues.apache.org/jira/browse/FLINK-20186 




> 在 2020年12月3日,20:08,Wei Zhong  写道:
> 
> Hi,
> 
> 现在的查找TableFactory的代码在错误信息显示上似乎存在问题,看不到真实的类名,可以先手动执行一下以下代码查看到底是哪些类被判定为JDBC的DynamicTableSinkFactory了:
> 
> List result = new LinkedList<>();
> ServiceLoader
>.load(Factory.class, Thread.currentThread().getContextClassLoader())
>.iterator()
>.forEachRemaining(result::add);
> List jdbcResult = result.stream().filter(f ->
>DynamicTableSinkFactory.class.isAssignableFrom(f.getClass())).filter(
>f -> f.factoryIdentifier().equals("jdbc")).collect(Collectors.toList());
> System.out.println(jdbcResult);
> 
> 
>> 在 2020年12月3日,19:50,hailongwang <18868816...@163.com 
>> > 写道:
>> 
>> Hi,
>> 你是用哪个版本的呢?有没有自己继承了 DynamicTableSinkFactory 实现的 factoryIdentifier 方法返回 `JDCB` 
>> 的 Connector?
>> 
>> 
>> Best,
>> Hailong
>> 在 2020-12-03 14:44:18,"xuzh" mailto:huazhe...@qq.com>> 写道:
>>> 错误:
>>> 
>>> 
>>> Caused by: org.apache.flink.table.api.ValidationException: Multiple 
>>> factories for identifier 'jdbc' that implement 
>>> 'org.apache.flink.table.factories.DynamicTableSinkFactory' found in the 
>>> classpath
>>> 
>>> 
>>> 看意思是找到了两个一样的类:DynamicTableSinkFactory
>>> 
>>> 
>>> 代码如下:
>>> package org.apache.flink.examples;
>>> 
>>> 
>>> import 
>>> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
>>> import org.apache.flink.table.api.EnvironmentSettings;
>>> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
>>> import org.apache.flink.table.factories.DynamicTableSinkFactory;
>>> 
>>> 
>>> public class CDC2ss2 {
>>>     public static void main(String[] args) throws Exception {
>>> 
>>> 
>>>         // set up execution environment
>>>         StreamExecutionEnvironment env = 
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>         StreamTableEnvironment tEnv;
>>> 
>>> 
>>>         EnvironmentSettings settings = 
>>> EnvironmentSettings.newInstance()
>>>                 .useBlinkPlanner()
>>>                 .inStreamingMode()
>>>                 .build();
>>>         tEnv = StreamTableEnvironment.create(env, 
>>> settings);
>>>         String src_sql = "CREATE TABLE userss (\n" +
>>>                 "    
>>>  user_id INT,\n" +
>>>                 "    
>>>  user_nm STRING\n" +
>>>                 ") WITH (\n" +
>>>                 "    
>>>   'connector' = 'mysql-cdc',\n" +
>>>                 "    
>>>   'hostname' = '10.12.5.37',\n" +
>>>                 "    
>>>   'port' = '3306',\n" +
>>>                 "    
>>>   'username' = 'dps',\n" +
>>>                 "    
>>>   'password' = 'dps1234',\n" +
>>>                 "    
>>>   'database-name' = 'rpt',\n" +
>>>                 "    
>>>   'table-name' = 'users'\n" +
>>>                 "    
>>>   )";
>>> 
>>> 
>>>         tEnv.executeSql(src_sql); // 创建表
>>> 
>>> 
>>>         String sink="CREATE TABLE sink (\n" +
>>>                 "    
>>>  user_id INT,\n" +
>>>                 "    
>>>  user_nm STRING,\n" +
>>>                 "    
>>>  primary key(user_id)  NOT ENFORCED \n" +
>>>                 ") WITH (\n" +
>>>                 "    
>>>   'connector' = 'jdbc',\n" +
>>>                 "    
>>>   'url' = 'jdbc:mysql://10.0.171.171:3306/dps?useSSL=false',\n 
>>> " +
>>>                 "    
>>>   'username' = 'dps',\n" +
>>>                 "    
>>>   'password' = 'dps1234',\n" +
>>>                 "    
>>>   'table-name' = 'sink'\n" +
>>>                 "    
>>>   )";
>>>         String to_print_sql="insert into sink select 
>>> user_id  ,user_nm   from userss";
>>>          tEnv.executeSql(sink);
>>>         tEnv.executeSql(to_print_sql);
>>>         env.execute();
>>>     }
>>> 
>>> 
>>> }
>>> 
>>> 
>>> 
>>> 
>>> 
>>> 详细错误:
>>> 
>>> 
>>> Exception in thread "main" org.apache.flink.table.api.ValidationException: 
>>> Unable to create a sink for writing table 
>>> 'default_catalog.default_database.sink'.
>>> 
>>> 
>>> Table options are:
>>> 
>>> 
>>> 'connector'='jdbc'
>>> 'password'='dps1234'
>>> 'table-name'='sink'
>>> 'url'='jdbc:mysql://10.0.171.171:3306/dps?useSSL=false' 
>>> 
>>> 'username'='dps'
>>> at 
>>> org.apache.flink.table.factories.FactoryUtil.createTableSink(FactoryUtil.java:164)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:344)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:204)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at 
>>> org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:163)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at 
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
>>> at scala.collection.