flink sql 1.12 写数据到elasticsearch,部署问题

2020-12-15 Thread cljb...@163.com
hi,
flink sql 1.12版本,写数据到elasticsearch时,本地执行正常,部署到服务器上,报如下错误。
检查了打的jar包,里面是包含相应的类的,在flink 
lib下也已经放了flink-connector-elasticsearch7_2.11-1.12.0.jar 包。
调整了类的加载,试了child-first和parent-first都不行
有遇到类似问题的吗?
谢谢!

错误提示如下:

org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Could not load service provider for table factories.
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:330)
at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:198)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:743)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:242)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:971)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1047)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1685)
at 
org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1047)
Caused by: org.apache.flink.table.api.TableException: Could not load service 
provider for table factories.
at 
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:218)
at 
org.apache.flink.table.factories.TableFactoryService.findAllInternal(TableFactoryService.java:170)
at 
org.apache.flink.table.factories.TableFactoryService.findAll(TableFactoryService.java:125)
at 
org.apache.flink.table.factories.ComponentFactoryService.find(ComponentFactoryService.java:48)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.create(TableEnvironmentImpl.java:263)
at org.apache.flink.table.api.TableEnvironment.create(TableEnvironment.java:90)
at com.searchrec.main.XfkEsIndex.main(XfkEsIndex.java:24)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:316)
... 11 more
Caused by: java.util.ServiceConfigurationError: 
org.apache.flink.table.factories.TableFactory: Provider 
org.apache.flink.streaming.connectors.elasticsearch7.Elasticsearch7UpsertTableSinkFactory
 could not be instantiate
d at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at 
org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:214)
... 22 more
Caused by: java.lang.NoClassDefFoundError: 
org/elasticsearch/common/xcontent/XContentType
at 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchUpsertTableSinkFactoryBase.(ElasticsearchUpsertTableSinkFactoryBase.java:105)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 26 more
Caused by: java.lang.ClassNotFoundException: 
org.elasticsearch.common.xcontent.XContentType
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:335)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 33 more




cljb...@163.com


flink sql es写入时,用户名密码认证不支持

2020-11-30 Thread cljb...@163.com
看了一下官网文档,目前还不支持sql 写入es时进行用户名密码认证,有什么比较简单的解决方法吗?
除了用api之外。

感谢!



cljb...@163.com


Re: Re: flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
感谢回复!
刚才找到问题了,从maven官网拷贝过来的 pom依赖,  scope被设置成 test了。。。改成compile就好了



cljb...@163.com
 
发件人: Jark Wu
发送时间: 2020-11-27 19:14
收件人: user-zh
主题: Re: flink sql cdc 写数据到mysql,找不到相关的类
估计是你的 flink-json 和框架已经打包进去的 flink-json 冲突了,可能是你加进去的 flink-json 版本不是 1.11.x ?
 
On Fri, 27 Nov 2020 at 19:03, cljb...@163.com  wrote:
 
> 相关的依赖以及添加,不知道如下问题是如何导致,求解!
> 已添加的依赖有:
> flink-connector-mysql-cdc
> flink-format-changelog-json
> flink-json
>
> 报错信息如下:
>
> java.util.ServiceConfigurationError:
> org.apache.flink.table.factories.Factory: Provider
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory could not
> be instantiated
> at java.util.ServiceLoader.fail(ServiceLoader.java:232)
> at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
> at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
> at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
> at java.util.Iterator.forEachRemaining(Iterator.java:116)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
> at
> org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
> at
> org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
> at
> org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
> at
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
> Caused by: java.lang.NoClassDefFoundError:
> org/apache/flink/formats/json/JsonOptions
> at
> com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at
> sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
> at
> sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
> at java.lang.Class.newInstance(Class.java:442)
> at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
> ... 25 common frames omitted
> Caused by: java.lang.ClassNotFoundException:
> org.apache.flink.formats.json.JsonOptions
> at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
> ... 32 common frames omitted
>
>
>
> cljb...@163.com
>


flink sql cdc 写数据到mysql,找不到相关的类

2020-11-27 Thread cljb...@163.com
相关的依赖以及添加,不知道如下问题是如何导致,求解!
已添加的依赖有:
flink-connector-mysql-cdc
flink-format-changelog-json
flink-json

报错信息如下:

java.util.ServiceConfigurationError: org.apache.flink.table.factories.Factory: 
Provider com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory 
could not be instantiated
at java.util.ServiceLoader.fail(ServiceLoader.java:232)
at java.util.ServiceLoader.access$100(ServiceLoader.java:185)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:384)
at java.util.ServiceLoader$LazyIterator.next(ServiceLoader.java:404)
at java.util.ServiceLoader$1.next(ServiceLoader.java:480)
at java.util.Iterator.forEachRemaining(Iterator.java:116)
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactories(FactoryUtil.java:342)
at 
org.apache.flink.table.factories.FactoryUtil.discoverFactory(FactoryUtil.java:221)
at 
org.apache.flink.table.factories.FactoryUtil.getDynamicTableFactory(FactoryUtil.java:326)
at 
org.apache.flink.table.factories.FactoryUtil.createTableSource(FactoryUtil.java:118)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:135)
at 
org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
at 
org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
at 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
at 
org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
at 
org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
at 
org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
at com.jfbank.searchrec.main.XfkOsProducts.main(XfkOsProducts.java:31)
Caused by: java.lang.NoClassDefFoundError: 
org/apache/flink/formats/json/JsonOptions
at 
com.alibaba.ververica.cdc.formats.json.ChangelogJsonFormatFactory.(ChangelogJsonFormatFactory.java:53)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at 
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at 
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at java.util.ServiceLoader$LazyIterator.nextService(ServiceLoader.java:380)
... 25 common frames omitted
Caused by: java.lang.ClassNotFoundException: 
org.apache.flink.formats.json.JsonOptions
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
... 32 common frames omitted



cljb...@163.com


flink sql cdc 如果只处理一次全量数据问题

2020-11-26 Thread cljb...@163.com
之前一直使用streaming api,这两天开始使用sql。
有个疑问,flink sql  cdc读取mysql的数据时候,会处理 全量 + 增量数据。
那么如果同一个任务上线后,后续有变化,修改后再次上线,这个时候我并不希望处理之前过的数据。这个时候是怎么做呢?

cdc里面有进行state保存消费过的changelog的位置吗?这样我重新上线的时候从savepoint或者checkpoint进行恢复,是不是就可以了?

感谢!


cljb...@163.com


关于flink和hadoop版本的问题

2019-12-06 Thread cljb...@163.com
您好:

问一个关于flink和hadoop版本的问题。目前我们生产环境是hadoop3.0+的版本,现在官网上flink1.9+没有直接打包好的捆绑的hadoop3.0+的版本。
 但是我自己下载flink1.9.1版本,然后下载了  可选组件里的 Pre-bundled Hadoop 2.8.3 (asc, sha1)  
,并且将这个包放到flink的lib下,也是可以正常操作hadoop的。
请问这样有什么影响吗? 因为自己下载flink源码打包一直没有编译成功。麻烦告知!
 
感谢!
陈军



cljb...@163.com