这个对应关系是通过 Factory#factoryIdentifier 来决定的。 比如 DebeziumJsonFormatFactory#factoryIdentifier() 就是返回了 'debezium-json'
Best, Jark On Thu, 16 Jul 2020 at 22:29, wangl...@geekplus.com.cn < wangl...@geekplus.com.cn> wrote: > > 谢谢,我理解了。 > > > > wangl...@geekplus.com.cn > > Sender: Harold.Miao > Send Time: 2020-07-16 19:33 > Receiver: user-zh > Subject: Re: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > 我的理解 : 大概就是通过spi机制加载类,然后通过属性去过滤出来需要的class 类似下面的代码 > > private static <T extends TableFactory> T findSingleInternal( > Class<T> factoryClass, > Map<String, String> properties, > Optional<ClassLoader> classLoader) { > > List<TableFactory> tableFactories = discoverFactories(classLoader); > List<T> filtered = filter(tableFactories, factoryClass, properties); > > if (filtered.size() > 1) { > throw new AmbiguousTableFactoryException( > filtered, > factoryClass, > tableFactories, > properties); > } else { > return filtered.get(0); > } > } > > private static List<TableFactory> > discoverFactories(Optional<ClassLoader> classLoader) { > try { > List<TableFactory> result = new LinkedList<>(); > ClassLoader cl = > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > ServiceLoader > .load(TableFactory.class, cl) > .iterator() > .forEachRemaining(result::add); > return result; > } catch (ServiceConfigurationError e) { > LOG.error("Could not load service provider for table factories.", e); > throw new TableException("Could not load service provider for > table factories.", e); > } > > } > > > wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> 于2020年7月16日周四 > 下午7:04写道: > > > > > 我在 > > > flink-formats/flink-json/src/main/resources/META-INF/services/org.apache.flink.table.factories.Factory > > 找到了 SPI 的配置: > > > > org.apache.flink.formats.json.JsonFileSystemFormatFactory > > org.apache.flink.formats.json.JsonFormatFactory > > org.apache.flink.formats.json.debezium.DebeziumJsonFormatFactory > > org.apache.flink.formats.json.canal.CanalJsonFormatFactory > > > > 还是没有搞清楚 指定 'format'='debezium-json' 怎么就能对应到 DebeziumJsonFormatFactory > > 我的理解肯定要有一个地方指明 debezium-json 要对应到 DebeziumJsonFormatFactory, 但是我 grep > > 代码没找到类似的关系映射配置。 > > > > > > 谢谢, > > 王磊 > > > > > > > > wangl...@geekplus.com.cn > > > > > > Sender: godfrey he > > Send Time: 2020-07-16 16:38 > > Receiver: user-zh > > Subject: Re: FlinkSQL 是通过怎样的机制找到要执行的 Java 代码的呢? > > 通过Java 的 SPI 机制来找到对应的 format,可以参考 [1] > > > > [1] > > > > > https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/connectors/#how-to-use-connectors > > > > Best, > > Godfrey > > > > wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> 于2020年7月16日周四 > > 下午4:02写道: > > > > > 比如: > > > > > > CREATE TABLE my_table ( > > > id BIGINT, > > > first_name STRING, > > > last_name STRING, > > > email STRING > > > ) WITH ( > > > 'connector'='kafka', > > > 'topic'='user_topic', > > > 'properties.bootstrap.servers'='localhost:9092', > > > 'scan.startup.mode'='earliest-offset', > > > 'format'='debezium-json' > > > ); > > > > > > 最终解析 debezium-json 应该是 > > > > > > flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium > > > 下面的代码 > > > 但 flinkSQL 是怎样的机制找到要执行的 Java 代码的呢? > > > > > > 谢谢, > > > 王磊 > > > > > > > > > wangl...@geekplus.com.cn > > > > > > > > > > > -- > > Best Regards, > Harold Miao >