flink sql 1.12 写数据到elasticsearch,部署问题
hi, flink sql 1.12版本,写数据到elasticsearch时,本地执行正常,部署到服务器上,报如下错误。 检查了打的jar包,里面是包含相应的类的,在flink lib下也已经放了flink-connector-elasticsearch7_2.11-1.12.0.jar 包。 调整了类的加载,试了child-first和parent-first都不行 有遇到类似问题的吗? 谢谢! 错误提示如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Could not load service provider for table factories. at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047) Caused by: org.apache.flink.table.api.TableException: Could not load service provider for table factories. at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:218) at org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170) at org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125) at org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48) at org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:263) at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:90) at com.searchrec.main.XfkEsIndex.main(XfkEsIndex.java:24) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316) ... 11 more Caused by: java.util.ServiceConfigurationError: org.apache.flink.table.factories.TableFactory: Provider org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory could not be instantiate d at java.util.ServiceLoader.fail(ServiceLoader.java:232) at java.util.ServiceLoader.access$100(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at java.util.Iterator.forEachRemaining(Iterator.java:116) at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214) ... 22 more Caused by: java.lang.NoClassDefFoundError: org/elasticsearch/common/xcontent/XContentType at org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.(ElasticsearchUpsertTableSinkFactoryBase.java:105) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ... 26 more Caused by: java.lang.ClassNotFoundException: org.elasticsearch.common.xcontent.XContentType at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 33 more cljb...@163.com
flink sql es写入时,用户名密码认证不支持
看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗? 除了用api之外。 感谢! cljb...@163.com
Re: Re: flink sql cdc 写数据到mysql,找不到相关的类
感谢回复! 刚才找到问题了,从maven官网拷贝过来的 pom依赖, scope被设置成 test了。。。改成compile就好了 cljb...@163.com 发件人: Jark Wu 发送时间: 2020-11-27 19:14 收件人: user-zh 主题: Re: flink sql cdc 写数据到mysql,找不到相关的类 估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ? On Fri, 27 Nov 2020 at 19:03, cljb...@163.com wrote: > 相关的依赖以及添加,不知道如下问题是如何导致,求解! > 已添加的依赖有: > flink-connector-mysql-cdc > flink-format-changelog-json > flink-json > > 报错信息如下: > > java.util.ServiceConfigurationError: > org.apache.flink.table.factories.Factory: Provider > com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not > be instantiated > at java.util.ServiceLoader.fail(ServiceLoader.java:232) > at java.util.ServiceLoader.access$100(ServiceLoader.java:185) > at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) > at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) > at java.util.ServiceLoader$1.next(ServiceLoader.java:480) > at java.util.Iterator.forEachRemaining(Iterator.java:116) > at > org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342) > at > org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221) > at > org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) > at > org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) > at > org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) > at > org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) > at > org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) > at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org > $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) > at > org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) > at > org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) > at > org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) > at > org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) > at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31) > Caused by: java.lang.NoClassDefFoundError: > org/apache/flink/formats/json/JsonOptions > at > com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53) > at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) > at > sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) > at > sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) > at java.lang.reflect.Constructor.newInstance(Constructor.java:423) > at java.lang.Class.newInstance(Class.java:442) > at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) > ... 25 common frames omitted > Caused by: java.lang.ClassNotFoundException: > org.apache.flink.formats.json.JsonOptions > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > ... 32 common frames omitted > > > > cljb...@163.com >
flink sql cdc 写数据到mysql,找不到相关的类
相关的依赖以及添加,不知道如下问题是如何导致,求解! 已添加的依赖有: flink-connector-mysql-cdc flink-format-changelog-json flink-json 报错信息如下: java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not be instantiated at java.util.ServiceLoader.fail(ServiceLoader.java:232) at java.util.ServiceLoader.access$100(ServiceLoader.java:185) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384) at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404) at java.util.ServiceLoader$1.next(ServiceLoader.java:480) at java.util.Iterator.forEachRemaining(Iterator.java:116) at org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342) at org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221) at org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326) at org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135) at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78) at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492) at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102) at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661) at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345) at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164) at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151) at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761) at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238) at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684) at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31) Caused by: java.lang.NoClassDefFoundError: org/apache/flink/formats/json/JsonOptions at com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53) at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method) at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62) at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45) at java.lang.reflect.Constructor.newInstance(Constructor.java:423) at java.lang.Class.newInstance(Class.java:442) at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380) ... 25 common frames omitted Caused by: java.lang.ClassNotFoundException: org.apache.flink.formats.json.JsonOptions at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 32 common frames omitted cljb...@163.com
flink sql cdc 如果只处理一次全量数据问题
之前一直使用streaming api,这两天开始使用sql。 有个疑问,flink sql cdc读取mysql的数据时候,会处理 全量 + 增量数据。 那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢? cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了? 感谢! cljb...@163.com
关于flink和hadoop版本的问题
您好: 问一个关于flink和hadoop版本的问题。目前我们生产环境是hadoop3.0+的版本,现在官网上flink1.9+没有直接打包好的捆绑的hadoop3.0+的版本。 但是我自己下载flink1.9.1版本,然后下载了 可选组件里的 Pre-bundled Hadoop 2.8.3 (asc, sha1) ,并且将这个包放到flink的lib下,也是可以正常操作hadoop的。 请问这样有什么影响吗? 因为自己下载flink源码打包一直没有编译成功。麻烦告知! 感谢! 陈军 cljb...@163.com