flink1.12读mysql问题
'scan.partition.column'='id', 'scan.partition.num'='15', 'scan.partition.lower-bound'='1', 'scan.partition.upper-bound'='680994' 我设置了上面这几个参数给source mysql分区,但是并没有生效,真实情况是只有一个task读的mysql全量数据
Flink SQL UDF 如何自定义Metrics
HI, 如题,想要在Flink SQL中通过自定义UDF增加指标,从而实现自定义告警。那么如何在UDF中获取到RuntimeContext从而修改Metrics呢? -- Sent from: http://apache-flink.147419.n8.nabble.com/
?????? Flink ????????join
benchao??joinrocketmqflink??kafka + rocket mq ??flink?? -- -- ??: "user-zh"
flink sql tumble window 时区问题
hi all: 使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。 使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime 后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。 代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/
Re: 关于flinksql 与维表mysql的关联问题
Hi , 延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。 我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。 [1] https://issues.apache.org/jira/browse/FLINK-19063 Best, LakeShen 小屁孩 <932460...@qq.com> 于2020年6月8日周一 上午9:28写道: > hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题 > > > > > -- 原始邮件 -- > 发件人: "Px New"<15701181132mr@gmail.com>; > 发送时间: 2020年6月7日(星期天) 晚上7:02 > 收件人: "user-zh" > 主题: Re: 关于flinksql 与维表mysql的关联问题 > > > > 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作? > > 1048262223 <1048262...@qq.com>于2020年6月7日 周日下午3:57写道: > > > Hi > > > > > > 可以使用open + broadcast的方式解决~ > > > > > > Best, > > Yichao Yang > > > > > > > > > > > > -- 原始邮件 -- > > 发件人: "Px New"<15701181132mr@gmail.com>; > > 发送时间: 2020年6月6日(星期六) 上午9:50 > > 收件人: "user-zh" > > > 主题: Re: 关于flinksql 与维表mysql的关联问题 > > > > > > > > Hi ,我有一个相关操作的一疑问. > > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢? > > > > Michael Ran > > > > 放到open 方法里面可以吗? > > > 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道: > > > >dear: > 我有个问题想请教下,关于flinksql与mysql维表关联 > > 关于mysql更新的问题 > > > > > > 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据 > > >
Re: Flink 维表延迟join
Hi , 延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们这边业务方也有维表延迟关联的述求,比如 HBase 维表的数据关联。 当前的场景是,有一张实时维表,消费 mysql binlog,然后业务方 etl 后,输出到 HBase。然后业务方还有另外一个流,会去关联这张维表,由于存在某些 rowkey 的数据还没有写入到 hbase,而另外一条流就去关联 HBase,却没有数据。所以业务方希望有个延迟维表关联功能,比如 10 分钟后在进行关联,目前我们考虑借助 Timer 来实现的, 社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。 [1] https://issues.apache.org/jira/browse/FLINK-19063 Best, LakeShen 郑斌斌 于2020年8月27日周四 上午9:23写道: > 小伙伴们: > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > Thanks&Regards
Re: Flink 维表延迟join
Hi, Benchao,这种发送到另外一个 topic ,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。 Benchao Li 于2020年8月27日周四 上午10:08写道: > Hi, > > 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。 > > 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。 > > 郑斌斌 于2020年8月27日周四 上午9:23写道: > > > 小伙伴们: > > > > > 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。 > > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。 > > > > Thanks&Regards > > > > -- > > Best, > Benchao Li >
关于Flink五分钟统计导致五分钟的CPU尖刺问题
如题,线上集群很多任务依赖五分钟粒度。导致CPU存在五分钟一波的尖刺,正常情况CPU利用为40%,尖刺时候达到90%+。 大家一般类似情况都怎么解决呢?
flink sql 并发数问题
hi all, 用flink sql消费kafka数据,有效并发数是由kafka分区数来决定的,请问有什么方法提高有效并发数吗? 因为有一个UDF是请求python http服务,速度不快,有没有方法单独提高这一块的并发数呢?
关于standalone集群jobmanager在操作时候web-ui卡顿的问题
如题,standalone集群,当有集群操作的时候容易卡顿。 集群操作指:提交任务、触发保存点并停止任务、主动触发保存点(不严重)等。 这些操作执行时候web-ui回出现卡顿转圈,大多数情况转圈10-30s内会结束恢复正常,偶尔情况下会出现jobmanager进程失败。 如上,一个是希望大佬们帮忙分析下原因? 目前Jobmanager和Taskmanager是相同机器部署,20台机器,20个Jm和Tm进程。 不清楚卡顿和JM的“内存”是否有关,还是主要CPU? 我JM目前内存10G+,TM内存70G+。 我当前计划想把JM数量降低,不搞20个,本身也用不着。想着单独出来JM部署,这样可以少部署,但提升JM的内存。当然不清楚内存影响大不大。如果是CPU影响大,可能还需要单独部署JM的机器不部署TM这样。
Re: 社区有人实现过Flink的MongodbSource吗?
Hi Even, 我没有实际使用过,不过根据 Debezium 文档 [1] 和我了解到的用户反馈,存量读取和实时增量读取都是支持的。 [1] https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-streaming-changes Best, Paul Lam > 2021年2月24日 17:08,Evan 写道: > > 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取 > > > > > 发件人: Paul Lam > 发送时间: 2021-02-24 17:03 > 收件人: user-zh > 主题: Re: 社区有人实现过Flink的MongodbSource吗? > Hi, > > Debezium 支持 MongoDB CDC[1],可以了解下。 > > [1] https://debezium.io/documentation/reference/connectors/mongodb.html > > Best, > Paul Lam > >> 2021年2月24日 16:23,Evan 写道: >> >> >> 有人完整的实现Flink的MongodbSource吗 >> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 >> >> >
Re: 社区有人实现过Flink的MongodbSource吗?
据我所知暂时没有。不过我所在公司内部有很多 mongodb 使用,因此我们也有计划开发 mongodb connector(主要是作为 sink)。 之前因为等 FLIP-143 新接口搁置了一下计划,最近可以重启。 如果顺利的话,我们预计放到 bahir 上或贡献给 mongo 社区(考虑到 flink 社区现在对新增 connector 到主 repo 比较谨慎)。 Best, Paul Lam > 2021年2月24日 18:16,林影 写道: > > 请问flink的mongodb connector这块后续有计划吗 > > Evan 于2021年2月24日周三 下午5:08写道: > >> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取 >> >> >> >> >> 发件人: Paul Lam >> 发送时间: 2021-02-24 17:03 >> 收件人: user-zh >> 主题: Re: 社区有人实现过Flink的MongodbSource吗? >> Hi, >> >> Debezium 支持 MongoDB CDC[1],可以了解下。 >> >> [1] https://debezium.io/documentation/reference/connectors/mongodb.html >> >> Best, >> Paul Lam >> >>> 2021年2月24日 16:23,Evan 写道: >>> >>> >>> 有人完整的实现Flink的MongodbSource吗 >>> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 >>> >>> >> >>
flink-connector-hbase-2.2 ??????????????HBaseConnectorITCase ????????
hi,all flinkmaster HBase?? HBaseConnectorITCase testHBaseLookupTableSource ?? ?? Formatting using clusterid: testClusterID java.io.IOException: All datanodes [DatanodeInfoWithStorage[127.0.0.1:50245,DS-96ca101a-88dd-4171-84a7-1807d74885bd,DISK]] are bad. Aborting... at org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1440) at org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1383) at org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1184) at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:611)
UpsertKafka state持续增加问题
大家好: 我在flink1.12.1上,通过SQL API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。 持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。 请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?
回复: Flink 1.12 On Yarn 作业提交失败问题
说明一下,yarn.ship-files这个配置的文件夹下需要包含flink-yarn的jar包,可以配置成flink home下的lib文件夹 | | 马阳阳 | | ma_yang_y...@163.com | 签名由网易邮箱大师定制 在2021年02月25日 08:59,马阳阳 写道: 可以试试在flink-conf.yaml里添加如下配置: yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar yarn.ship-files: /data/dfl2/lib 这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。 Ps: 造成main class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新 | | 马阳阳 | | ma_yang_y...@163.com | 签名由网易邮箱大师定制 在2021年02月23日 22:36,m183 写道: 你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行 2021年2月23日 下午9:27,LakeShen 写道: 这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录 凌战 于2021年2月23日周二 下午7:33写道: 同提交作业到On Yarn集群,客户端的错误也是 org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1610671284452_0243 failed 10 times due to AM Container for appattempt_1610671284452_0243_10 exited with exitCode: 1 Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from container-launch. Container id: container_e48_1610671284452_0243_10_01 Exit code: 1 [2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err : [2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err : Yarn那边的日志显示:Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题 | | 凌战 | | m18340872...@163.com | 签名由网易邮箱大师定制 在2021年2月23日 18:46,LakeShen 写道: Hi 社区, 最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348) ... 11 more Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681) ... 22 more Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_16139
回复: Flink 1.12 On Yarn 作业提交失败问题
可以试试在flink-conf.yaml里添加如下配置: yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar yarn.ship-files: /data/dfl2/lib 这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。 Ps: 造成main class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新 | | 马阳阳 | | ma_yang_y...@163.com | 签名由网易邮箱大师定制 在2021年02月23日 22:36,m183 写道: 你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行 2021年2月23日 下午9:27,LakeShen 写道: 这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录 凌战 于2021年2月23日周二 下午7:33写道: 同提交作业到On Yarn集群,客户端的错误也是 org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1610671284452_0243 failed 10 times due to AM Container for appattempt_1610671284452_0243_10 exited with exitCode: 1 Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from container-launch. Container id: container_e48_1610671284452_0243_10_01 Exit code: 1 [2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err : [2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1. Error file: prelaunch.err. Last 4096 bytes of prelaunch.err : Yarn那边的日志显示:Could not find or load main class org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint 不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题 | | 凌战 | | m18340872...@163.com | 签名由网易邮箱大师定制 在2021年2月23日 18:46,LakeShen 写道: Hi 社区, 最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Failed to execute sql at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218) at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114) at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812) at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246) at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054) at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:422) at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692) at org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41) at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132) Caused by: org.apache.flink.table.api.TableException: Failed to execute sql at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666) at com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95) at java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380) at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580) at com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348) ... 11 more Caused by: org.apache.flink.client.deployment.ClusterDeploymentException: Could not deploy Yarn job cluster. at org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481) at org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81) at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905) at org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135) at org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55) at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681) ... 22 more Caused by: org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The YARN application unexpectedly switched to state FAILED during deployment. Diagnostics from YARN: Application application_1613992328588_4441 failed 2 times due to AM Container for appattempt_1613992328588_4441_02 exited with exitCode: 1 Diagnostics: Exception from containe
UpsertKafka状态保存问题
大家好: 我在flink1.12.1上,通过SQL API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。 持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。 请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?
Re: Re: 社区有人实现过Flink的MongodbSource吗?
请问flink的mongodb connector这块后续有计划吗 Evan 于2021年2月24日周三 下午5:08写道: > 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取 > > > > > 发件人: Paul Lam > 发送时间: 2021-02-24 17:03 > 收件人: user-zh > 主题: Re: 社区有人实现过Flink的MongodbSource吗? > Hi, > > Debezium 支持 MongoDB CDC[1],可以了解下。 > > [1] https://debezium.io/documentation/reference/connectors/mongodb.html > > Best, > Paul Lam > > > 2021年2月24日 16:23,Evan 写道: > > > > > > 有人完整的实现Flink的MongodbSource吗 > > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 > > > > > >
Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory
Hi . 我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 . flink2021 于2021年2月19日周五 下午12:39写道: > 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢? > JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC > -XX:MaxDirectMemorySize=8192m" > 其它也就是写常规的配置: > og.segment.bytes=1073741824 > log.retention.check.interval.ms=30 > #broker能接收消息的最大字节数 > message.max.bytes=2 > #broker可复制的消息的最大字节数 > replica.fetch.max.bytes=204857600 > #消费者端的可读取的最大消息 > fetch.message.max.bytes=204857600 > max.poll.records=500 > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Re: 社区有人实现过Flink的MongodbSource吗?
好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取 发件人: Paul Lam 发送时间: 2021-02-24 17:03 收件人: user-zh 主题: Re: 社区有人实现过Flink的MongodbSource吗? Hi, Debezium 支持 MongoDB CDC[1],可以了解下。 [1] https://debezium.io/documentation/reference/connectors/mongodb.html Best, Paul Lam > 2021年2月24日 16:23,Evan 写道: > > > 有人完整的实现Flink的MongodbSource吗 > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 > >
Re: Flink On Yarn Per Job 作业提交失败问题
Hi,凌战 看看hadoop环境变量是否正确设置,可以参考文档 https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation Best, Robin 凌战 wrote > hi,社区 > 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如 > -rw-r--r-- 3 hdfs supergroup 9402 2021-02-24 11:02 > /user/hdfs/.flink/application_1610671284452_0257/WordCount.jar > -rw-r--r-- 3 hdfs supergroup 1602 2021-02-24 11:09 > /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp > -rw-r--r-- 3 hdfs supergroup 32629 2021-02-24 11:09 > /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp > -rw-r--r-- 3 hdfs supergroup 110075001 2021-02-24 11:09 > /user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar > > > 但是报错 Could not find or load main class > org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint > 发现上传文件目录的权限是 -rw-r--r-- ,不知道是不是因为权限问题导致 > > > 希望有人解惑! > | | > 凌战 > | > | > m18340872285@ > | > 签名由网易邮箱大师定制 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 社区有人实现过Flink的MongodbSource吗?
Hi, Debezium 支持 MongoDB CDC[1],可以了解下。 [1] https://debezium.io/documentation/reference/connectors/mongodb.html Best, Paul Lam > 2021年2月24日 16:23,Evan 写道: > > > 有人完整的实现Flink的MongodbSource吗 > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化 > >
社区有人实现过Flink的MongodbSource吗?
有人完整的实现Flink的MongodbSource吗 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化