这个对应关系是通过 Factory#factoryIdentifier 来决定的。
比如 DebeziumJsonFormatFactory#factoryIdentifier() 就是返回了 'debezium-json'

Best,
Jark

On Thu, 16 Jul 2020 at 22:29, wangl...@geekplus.com.cn <
wangl...@geekplus.com.cn> wrote:

>
>  谢谢,我理解了。
>
>
>
> wangl...@geekplus.com.cn
>
> Sender: Harold.Miao
> Send Time: 2020-07-16 19:33
> Receiver: user-zh
> Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> 我的理解 :  大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class   类似下面的代码
>
> private static <T extends TableFactory> T findSingleInternal(
>       Class<T> factoryClass,
>       Map<String, String> properties,
>       Optional<ClassLoader> classLoader) {
>
>    List<TableFactory> tableFactories = discoverFactories(classLoader);
>    List<T> filtered = filter(tableFactories, factoryClass, properties);
>
>    if (filtered.size() > 1) {
>       throw new AmbiguousTableFactoryException(
>          filtered,
>          factoryClass,
>          tableFactories,
>          properties);
>    } else {
>       return filtered.get(0);
>    }
> }
>
> private static List<TableFactory>
> discoverFactories(Optional<ClassLoader> classLoader) {
>    try {
>       List<TableFactory> result = new LinkedList<>();
>       ClassLoader cl =
> classLoader.orElse(Thread.currentThread().getContextClassLoader());
>       ServiceLoader
>          .load(TableFactory.class, cl)
>          .iterator()
>          .forEachRemaining(result::add);
>       return result;
>    } catch (ServiceConfigurationError e) {
>       LOG.error("Could not load service provider for table factories.", e);
>       throw new TableException("Could not load service provider for
> table factories.", e);
>    }
>
> }
>
>
> wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> 于2020年7月16日周四
> 下午7:04写道:
>
> >
> > 我在
> >
> flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory
> > 找到了 SPI 的配置:
> >
> > org.apache.flink.formats.json.JsonFileSystemFormatFactory
> > org.apache.flink.formats.json.JsonFormatFactory
> > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory
> > org.apache.flink.formats.json.canal.CanalJsonFormatFactory
> >
> > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory
> > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep
> > 代码没找到类似的关系映射配置。
> >
> >
> > 谢谢,
> > 王磊
> >
> >
> >
> > wangl...@geekplus.com.cn
> >
> >
> > Sender: godfrey he
> > Send Time: 2020-07-16 16:38
> > Receiver: user-zh
> > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢?
> > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1]
> >
> > [1]
> >
> >
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors
> >
> > Best,
> > Godfrey
> >
> > wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> 于2020年7月16日周四
> > 下午4:02写道:
> >
> > > 比如:
> > >
> > > CREATE TABLE my_table (
> > >   id BIGINT,
> > >  first_name STRING,
> > >  last_name STRING,
> > >  email STRING
> > > ) WITH (
> > >  'connector'='kafka',
> > >  'topic'='user_topic',
> > >  'properties.bootstrap.servers'='localhost:9092',
> > >  'scan.startup.mode'='earliest-offset',
> > >  'format'='debezium-json'
> > > );
> > >
> > > 最终解析 debezium-json 应该是
> > >
> >
> flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium
> > > 下面的代码
> > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢?
> > >
> > > 谢谢,
> > > 王磊
> > >
> > >
> > > wangl...@geekplus.com.cn
> > >
> > >
> >
>
>
> --
>
> Best Regards,
> Harold Miao
>

回复