我理解你可以让KafkaRetractTableSourceSinkFactory的参数跟KafkaTableSourceSinkFactory的参数有不同的地方, 然后通过这个参数来区分两个不同的factory。比如加一个参数,表示这个sink是retract还是append类型之类的?
wangl...@geekplus.com.cn <wangl...@geekplus.com.cn> 于2020年3月31日周二 上午11:17写道: > 这个应该是得到两个 tableFactories 了。 我直接平行的把 KafkaTableSourceSinkFactory 那一套变成 > KafkaRetractTableSourceSinkFactory 写了一遍 > 但这个应该怎样改才合适呢? > > 137 private static <T extends TableFactory> T > findSingleInternal( > 138 Class<T> factoryClass, > 139 Map<String, String> properties, > 140 Optional<ClassLoader> classLoader) { > 141 > 142 List<TableFactory> tableFactories = > discoverFactories(classLoader); > 143 List<T> filtered = filter(tableFactories, > factoryClass, properties); > 144 > 145 if (filtered.size() > 1) { > 146 throw new AmbiguousTableFactoryException( > 147 filtered, > 148 factoryClass, > 149 tableFactories, > 150 properties); > 151 } else { > 152 return filtered.get(0); > 153 } > 154 } > > > 谢谢, > 王磊 > > > wangl...@geekplus.com.cn > > > Sender: wangl...@geekplus.com.cn > Send Time: 2020-03-31 10:50 > Receiver: user-zh > Subject: Re: RE: 实现 KafkaUpsertTableSink > > 我以相同的方式 实现了一个 KafkaRetractTableSink, 然后打成 jar 包放在 lib 目录下启动 sql-client: > > org.apache.flink.table.planner.delegation.BlinkExecutorFactory > at > org.apache.flink.table.factories.TableFactoryService.findSingleInternal(TableFactoryService.java:146) > at > org.apache.flink.table.factories.TableFactoryService.find(TableFactoryService.java:113) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createTableSource(ExecutionContext.java:377) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:559) > at java.util.LinkedHashMap.forEach(LinkedHashMap.java:684) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:557) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:494) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:159) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.<init>(ExecutionContext.java:118) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:742) > ... 3 more > > 这个改怎样解决呢? > > 谢谢, > 王磊 > > > > wangl...@geekplus.com.cn > > Sender: wxchunj...@163.com > Send Time: 2020-03-29 10:32 > Receiver: user-zh@flink.apache.org > Subject: RE: 实现 KafkaUpsertTableSink > Benchao,非常感谢大佬,之前不知道需要添加 Factory 到文件中,添加之后,可以正常运行了。 > -----Original Message----- > From: user-zh-return-2640-wxchunjhyy=163....@flink.apache.org > <user-zh-return-2640-wxchunjhyy=163....@flink.apache.org> On Behalf Of > Benchao Li > Sent: Saturday, March 28, 2020 6:28 PM > To: user-zh <user-zh@flink.apache.org> > Subject: Re: 实现 KafkaUpsertTableSink > Hi, > 你需要把你新增的Factory添加到 resources下的 > > META-INF/services/org.apache.flink.table.factories.TableFactory文件中,不知道这一步你是否已经做过了呢? > <wxchunj...@163.com> 于2020年3月28日周六 下午5:38写道: > > 各位大佬: > > > > 由于现在的 KafkaTableSink 不支持 sql 中有group ,参照 > > KafkaTableSink 和 HbaseUpsertTableSink 的一套逻辑实现了一套 > > KafkaUpsertTableSink: > > > > KafkaUpsertTableSink > > > > KafkaUpsertTableSinkBase > > > > KafkaUpsertTableSourceSinkFactory > > > > KafkaUpsertTableSourceSinkFactoryBase > > > > MyKafkaValidator > > > > 但是在 TableFactoryService. discoverFactories 的方法中不能加载我定义的 > > KafkaUpsertTableSourceSinkFactory ,请问各位大佬,是否是需要在什么地方注册 > > 呢? > > > > > > > > > > /** > > * Searches for factories using Java service providers. > > * > > * @return all factories in the classpath */ private static > > List<TableFactory> discoverFactories(Optional<ClassLoader> > > classLoader) { > > try { > > List<TableFactory> result = new LinkedList<>(); > > ClassLoader cl = > > classLoader.orElse(Thread.currentThread().getContextClassLoader()); > > ServiceLoader > > .load(TableFactory.class, cl) > > .iterator() > > .forEachRemaining(result::add); > > //todo add > > result.add(new KafkaUpsertTableSourceSinkFactory()); > > return result; > > } catch (ServiceConfigurationError e) { > > LOG.error("Could not load service provider for table factories.", > e); > > throw new TableException("Could not load service provider for > > table factories.", e); > > } > > > > } > > > > > > > > > > > > 直接在返回的 result 里面添加对应的 KafkaUpsertTableSourceSinkFactory 是可 > > 以成功运行的。 > > > > 非常感谢 > > > > > > > > > > > > ------------------ > > > > Thanks > > > > venn > > > > > > > > > -- > Benchao Li > School of Electronics Engineering and Computer Science, Peking University > Tel:+86-15650713730 > Email: libenc...@gmail.com; libenc...@pku.edu.cn > -- Benchao Li School of Electronics Engineering and Computer Science, Peking University Tel:+86-15650713730 Email: libenc...@gmail.com; libenc...@pku.edu.cn