flink1.12读mysql问题

2021-02-24 文章 酷酷的浑蛋
'scan.partition.column'='id',

'scan.partition.num'='15',

'scan.partition.lower-bound'='1',

'scan.partition.upper-bound'='680994'


我设置了上面这几个参数给source mysql分区,但是并没有生效,真实情况是只有一个task读的mysql全量数据

Flink SQL UDF 如何自定义Metrics

2021-02-24 文章 xingoo
HI,
 如题,想要在Flink
SQL中通过自定义UDF增加指标,从而实现自定义告警。那么如何在UDF中获取到RuntimeContext从而修改Metrics呢?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

?????? Flink ????????join

2021-02-24 文章 Suhan
benchao??joinrocketmqflink??kafka
 + rocket mq
??flink??
 




--  --
??: 
   "user-zh"



flink sql tumble window 时区问题

2021-02-24 文章 xuhaiLong
hi all:

使用flink sql发现一个时区问题,在flink 1.11.3,flink 1.10 都有发现。


使用eventtime,datestream 转换为table,对times字段使用 rowtime。数据为 161421840,执行完rowtime 
后变成 161418960 直接就少了8小时,导致后续的开窗都有问题。


代码参考:https://paste.ubuntu.com/p/xYpWNrR9MT/





Re: 关于flinksql 与维表mysql的关联问题

2021-02-24 文章 LakeShen
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们考虑借助 Timer 来实现的,社区如果有这个功能的话,我觉得对于 Flink
使用方会有很大帮助的。
我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen

小屁孩 <932460...@qq.com> 于2020年6月8日周一 上午9:28写道:

> hi,目前我就是这样做的 数据在启动时会有数据先后到来的问题
>
>
>
>
> -- 原始邮件 --
> 发件人: "Px New"<15701181132mr@gmail.com>;
> 发送时间: 2020年6月7日(星期天) 晚上7:02
> 收件人: "user-zh"
> 主题: Re: 关于flinksql 与维表mysql的关联问题
>
>
>
> 好的 我可以理解为是: 通过env.addsouce创建一个广播流。下游connect后 在process方法中操作?
>
> 1048262223 <1048262...@qq.com>于2020年6月7日 周日下午3:57写道:
>
> > Hi
> >
> >
> > 可以使用open + broadcast的方式解决~
> >
> >
> > Best,
> > Yichao Yang
> >
> >
> >
> >
> >
> > -- 原始邮件 --
> > 发件人: "Px New"<15701181132mr@gmail.com>;
> > 发送时间: 2020年6月6日(星期六) 上午9:50
> > 收件人: "user-zh" >
> > 主题: Re: 关于flinksql 与维表mysql的关联问题
> >
> >
> >
> > Hi ,我有一个相关操作的一疑问.
> > 疑问: 如果我放在open 中的规则 有可能发生改变,需要重新更新的话呢?
> >
> > Michael Ran  >
> > > 放到open 方法里面可以吗?
> > > 在 2020-06-04 14:15:05,"小屁孩" <932460...@qq.com> 写道:
> > > >dear:&nbsp; &nbsp;
> 我有个问题想请教下,关于flinksql与mysql维表关联
> > 关于mysql更新的问题
> > >
> >
> 有没有好的方法?我现在使用的广播mysql但是有个问题,广播mysql时我的流数据已经到了但是mysql的数据还没有到导致我现在数据会再启动的时候丢失一部分数据
> > >


Re: Flink 维表延迟join

2021-02-24 文章 LakeShen
Hi ,
延迟维表关联这个特性我觉得还是一个比较通用的特性,目前我们这边业务方也有维表延迟关联的述求,比如 HBase 维表的数据关联。
当前的场景是,有一张实时维表,消费 mysql binlog,然后业务方 etl 后,输出到
HBase。然后业务方还有另外一个流,会去关联这张维表,由于存在某些 rowkey
的数据还没有写入到 hbase,而另外一条流就去关联 HBase,却没有数据。所以业务方希望有个延迟维表关联功能,比如 10
分钟后在进行关联,目前我们考虑借助 Timer 来实现的,
社区如果有这个功能的话,我觉得对于 Flink 使用方会有很大帮助的。我看社区有这样的一个 JIRA 再跟踪了[1],我会持续关注。

[1] https://issues.apache.org/jira/browse/FLINK-19063

Best,
LakeShen


郑斌斌  于2020年8月27日周四 上午9:23写道:

> 小伙伴们:
>
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
>
> Thanks&Regards


Re: Flink 维表延迟join

2021-02-24 文章 LakeShen
Hi,

Benchao,这种发送到另外一个 topic
,再来关联,虽然感觉可以减缓这种没有关联到维表数据对于下游业务的影响,不过还是很难控制到具体的延缓时间,我们现在也考虑怎么优雅的实现。

Benchao Li  于2020年8月27日周四 上午10:08写道:

> Hi,
>
> 我们也遇到过类似场景,我们的做法是修改了一下维表Join算子,让它来支持延迟join。
>
> 其实还有个思路,你可以把这种没有join到的数据发送到另外一个topic,然后再消费回来继续join。
>
> 郑斌斌  于2020年8月27日周四 上午9:23写道:
>
> > 小伙伴们:
> >
> >
> 大家好,请教一个问题,流表和维表在JOIN时,如果流表的数据没在维表中时,能否进行延迟join,比如,每10分钟进行match一下,连续match6次都没有match上的话,丢弃该数据。
> > 这个场景怎么通过flink SQL或UDF实现,目前是通过timer来实现的,感觉有些麻烦。
> >
> > Thanks&Regards
>
>
>
> --
>
> Best,
> Benchao Li
>


关于Flink五分钟统计导致五分钟的CPU尖刺问题

2021-02-24 文章 yidan zhao
如题,线上集群很多任务依赖五分钟粒度。导致CPU存在五分钟一波的尖刺,正常情况CPU利用为40%,尖刺时候达到90%+。
大家一般类似情况都怎么解决呢?


flink sql 并发数问题

2021-02-24 文章 Jeff
hi all,


用flink sql消费kafka数据,有效并发数是由kafka分区数来决定的,请问有什么方法提高有效并发数吗? 因为有一个UDF是请求python 
http服务,速度不快,有没有方法单独提高这一块的并发数呢?  

关于standalone集群jobmanager在操作时候web-ui卡顿的问题

2021-02-24 文章 yidan zhao
如题,standalone集群,当有集群操作的时候容易卡顿。
集群操作指:提交任务、触发保存点并停止任务、主动触发保存点(不严重)等。
这些操作执行时候web-ui回出现卡顿转圈,大多数情况转圈10-30s内会结束恢复正常,偶尔情况下会出现jobmanager进程失败。

如上,一个是希望大佬们帮忙分析下原因?
目前Jobmanager和Taskmanager是相同机器部署,20台机器,20个Jm和Tm进程。
不清楚卡顿和JM的“内存”是否有关,还是主要CPU? 我JM目前内存10G+,TM内存70G+。
我当前计划想把JM数量降低,不搞20个,本身也用不着。想着单独出来JM部署,这样可以少部署,但提升JM的内存。当然不清楚内存影响大不大。如果是CPU影响大,可能还需要单独部署JM的机器不部署TM这样。


Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Paul Lam
Hi Even,

我没有实际使用过,不过根据 Debezium 文档 [1] 和我了解到的用户反馈,存量读取和实时增量读取都是支持的。

[1] 
https://debezium.io/documentation/reference/connectors/mongodb.html#mongodb-streaming-changes

Best,
Paul Lam

> 2021年2月24日 17:08,Evan  写道:
> 
> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取
> 
> 
> 
> 
> 发件人: Paul Lam
> 发送时间: 2021-02-24 17:03
> 收件人: user-zh
> 主题: Re: 社区有人实现过Flink的MongodbSource吗?
> Hi,
> 
> Debezium 支持 MongoDB CDC[1],可以了解下。
> 
> [1] https://debezium.io/documentation/reference/connectors/mongodb.html
> 
> Best,
> Paul Lam
> 
>> 2021年2月24日 16:23,Evan  写道:
>> 
>> 
>> 有人完整的实现Flink的MongodbSource吗
>> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
>> 
>> 
> 



Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Paul Lam
据我所知暂时没有。不过我所在公司内部有很多 mongodb 使用,因此我们也有计划开发 mongodb connector(主要是作为 sink)。
之前因为等 FLIP-143 新接口搁置了一下计划,最近可以重启。

如果顺利的话,我们预计放到 bahir 上或贡献给 mongo 社区(考虑到 flink 社区现在对新增 connector 到主 repo 比较谨慎)。

Best,
Paul Lam

> 2021年2月24日 18:16,林影  写道:
> 
> 请问flink的mongodb connector这块后续有计划吗
> 
> Evan  于2021年2月24日周三 下午5:08写道:
> 
>> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取
>> 
>> 
>> 
>> 
>> 发件人: Paul Lam
>> 发送时间: 2021-02-24 17:03
>> 收件人: user-zh
>> 主题: Re: 社区有人实现过Flink的MongodbSource吗?
>> Hi,
>> 
>> Debezium 支持 MongoDB CDC[1],可以了解下。
>> 
>> [1] https://debezium.io/documentation/reference/connectors/mongodb.html
>> 
>> Best,
>> Paul Lam
>> 
>>> 2021年2月24日 16:23,Evan  写道:
>>> 
>>> 
>>> 有人完整的实现Flink的MongodbSource吗
>>> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
>>> 
>>> 
>> 
>> 



flink-connector-hbase-2.2 ??????????????HBaseConnectorITCase ????????

2021-02-24 文章 Anlen
hi,all
flinkmaster
HBase?? 
HBaseConnectorITCase 
testHBaseLookupTableSource ??
??
Formatting using clusterid: testClusterID
java.io.IOException: All datanodes 
[DatanodeInfoWithStorage[127.0.0.1:50245,DS-96ca101a-88dd-4171-84a7-1807d74885bd,DISK]]
 are bad. Aborting...
at 
org.apache.hadoop.hdfs.DataStreamer.handleBadDatanode(DataStreamer.java:1440)
at 
org.apache.hadoop.hdfs.DataStreamer.setupPipelineForAppendOrRecovery(DataStreamer.java:1383)
at 
org.apache.hadoop.hdfs.DataStreamer.processDatanodeError(DataStreamer.java:1184)
at org.apache.hadoop.hdfs.DataStreamer.run(DataStreamer.java:611)

UpsertKafka state持续增加问题

2021-02-24 文章 xiaohui zhang
大家好:
我在flink1.12.1上,通过SQL
API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。
持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。
请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?


回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-24 文章 马阳阳
说明一下,yarn.ship-files这个配置的文件夹下需要包含flink-yarn的jar包,可以配置成flink home下的lib文件夹


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月25日 08:59,马阳阳 写道:
可以试试在flink-conf.yaml里添加如下配置:
yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar
yarn.ship-files: /data/dfl2/lib


这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。


Ps: 造成main 
class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月23日 22:36,m183 写道:
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

2021年2月23日 下午9:27,LakeShen  写道:

这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_16139

回复: Flink 1.12 On Yarn 作业提交失败问题

2021-02-24 文章 马阳阳
可以试试在flink-conf.yaml里添加如下配置:
yarn.flink-dist-jar: /opt/flink-1.12/lib/flink-dist_2.11-1.12.0.jar
yarn.ship-files: /data/dfl2/lib


这个行为其实很奇怪,在我们的环境里,有的提交任务的机器不需要添加这个配置,有的不加这个配置就会造成那个main class找不到的问题。


Ps: 造成main 
class找不到的原因还可能是程序依赖的版本和部署的flink版本不一致,这种情况可能发生在flink依赖升级之后,部署的flink没有更新或者没有完全更新


| |
马阳阳
|
|
ma_yang_y...@163.com
|
签名由网易邮箱大师定制


在2021年02月23日 22:36,m183 写道:
你是指提交时所依赖的flink-dist jar包需要是 1.12 版本吗,现在改成1.12 版本还是不行

2021年2月23日 下午9:27,LakeShen  写道:

这个应该你的 flink 本地配置的目录要是 1.12 版本的,也就是 flink-dist 目录



凌战  于2021年2月23日周二 下午7:33写道:

同提交作业到On Yarn集群,客户端的错误也是


org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1610671284452_0243 failed
10 times due to AM Container for appattempt_1610671284452_0243_10
exited with  exitCode: 1
Failing this attempt.Diagnostics: [2021-02-23 18:51:00.021]Exception from
container-launch.
Container id: container_e48_1610671284452_0243_10_01
Exit code: 1


[2021-02-23 18:51:00.024]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


[2021-02-23 18:51:00.027]Container exited with a non-zero exit code 1.
Error file: prelaunch.err.
Last 4096 bytes of prelaunch.err :


Yarn那边的日志显示:Could not find or load main class
org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint


不过我是Flink 1.12 的API,然后提交的集群还是Flink1.10.1的,不知道哪里的问题


| |
凌战
|
|
m18340872...@163.com
|
签名由网易邮箱大师定制
在2021年2月23日 18:46,LakeShen 写道:
Hi 社区,

最近从 Flink 1.10 升级版本至 Flink 1.12,在提交作业到 Yarn 时,作业一直报错如下:


org.apache.flink.client.program.ProgramInvocationException: The main method
caused an error: Failed to execute sql

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:365)

at

org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:218)

at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)

at

org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:812)

at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:246)

at
org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1054)

at

org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1132)

at java.security.AccessController.doPrivileged(Native Method)

at javax.security.auth.Subject.doAs(Subject.java:422)

at

org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1692)

at

org.apache.flink.runtime.security.contexts.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)

at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1132)

Caused by: org.apache.flink.table.api.TableException: Failed to execute sql

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:699)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeOperation(TableEnvironmentImpl.java:767)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:666)

at

com.youzan.bigdata.FlinkStreamSQLDDLJob.lambda$main$0(FlinkStreamSQLDDLJob.java:95)

at

java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1380)

at
java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:580)

at
com.youzan.bigdata.FlinkStreamSQLDDLJob.main(FlinkStreamSQLDDLJob.java:93)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at

sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at

sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at

org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:348)

... 11 more

Caused by: org.apache.flink.client.deployment.ClusterDeploymentException:
Could not deploy Yarn job cluster.

at

org.apache.flink.yarn.YarnClusterDescriptor.deployJobCluster(YarnClusterDescriptor.java:481)

at

org.apache.flink.client.deployment.executors.AbstractJobClusterExecutor.execute(AbstractJobClusterExecutor.java:81)

at

org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1905)

at

org.apache.flink.client.program.StreamContextEnvironment.executeAsync(StreamContextEnvironment.java:135)

at

org.apache.flink.table.planner.delegation.ExecutorBase.executeAsync(ExecutorBase.java:55)

at

org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:681)

... 22 more

Caused by:
org.apache.flink.yarn.YarnClusterDescriptor$YarnDeploymentException: The
YARN application unexpectedly switched to state FAILED during deployment.
Diagnostics from YARN: Application application_1613992328588_4441 failed 2
times due to AM Container for appattempt_1613992328588_4441_02 exited
with  exitCode: 1
Diagnostics: Exception from containe

UpsertKafka状态保存问题

2021-02-24 文章 xiaohui zhang
大家好:
我在flink1.12.1上,通过SQL 
API测试upsertKafka,使用hdfs保存checkpoint数据,每30分钟进行一次checkpoint。kafka消息key和value均使用json格式。
持续写入300w不同主键的数据,checkpoint大小持续增加,最终生成save point时,大小接近300M。
请问UpsertKafka模式下,state中是否会一直保存所有的key?未被访问的key是否会被清空呢?

Re: Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 林影
请问flink的mongodb connector这块后续有计划吗

Evan  于2021年2月24日周三 下午5:08写道:

> 好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取
>
>
>
>
> 发件人: Paul Lam
> 发送时间: 2021-02-24 17:03
> 收件人: user-zh
> 主题: Re: 社区有人实现过Flink的MongodbSource吗?
> Hi,
>
> Debezium 支持 MongoDB CDC[1],可以了解下。
>
> [1] https://debezium.io/documentation/reference/connectors/mongodb.html
>
> Best,
> Paul Lam
>
> > 2021年2月24日 16:23,Evan  写道:
> >
> >
> > 有人完整的实现Flink的MongodbSource吗
> > 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> >
> >
>
>


Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-24 文章 Qishang
Hi .
我也遇到了这个问题, 最后怎么解决的? 版本 Flink 1.12.1 .

flink2021  于2021年2月19日周五 下午12:39写道:

> 嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
> JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
> -XX:MaxDirectMemorySize=8192m"
> 其它也就是写常规的配置:
> og.segment.bytes=1073741824
> log.retention.check.interval.ms=30
> #broker能接收消息的最大字节数
> message.max.bytes=2
> #broker可复制的消息的最大字节数
> replica.fetch.max.bytes=204857600
> #消费者端的可读取的最大消息
> fetch.message.max.bytes=204857600
> max.poll.records=500
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Evan
好的,十分感谢,我调研一下,之前网上搜了一些资料,实现的只能批量读取,读完程序就停止了,不能一直实时的增量读取



 
发件人: Paul Lam
发送时间: 2021-02-24 17:03
收件人: user-zh
主题: Re: 社区有人实现过Flink的MongodbSource吗?
Hi,
 
Debezium 支持 MongoDB CDC[1],可以了解下。
 
[1] https://debezium.io/documentation/reference/connectors/mongodb.html
 
Best,
Paul Lam
 
> 2021年2月24日 16:23,Evan  写道:
> 
> 
> 有人完整的实现Flink的MongodbSource吗
> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> 
> 
 


Re: Flink On Yarn Per Job 作业提交失败问题

2021-02-24 文章 Robin Zhang
Hi,凌战
看看hadoop环境变量是否正确设置,可以参考文档
https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/resource-providers/yarn.html#preparation

Best,
Robin


凌战 wrote
> hi,社区
> 在接口端设置用户为 hdfs 用户,在调度执行作业后,发现在/user/hdfs/.flink/application-id 目录下 存在相关包,如
> -rw-r--r--   3 hdfs supergroup   9402 2021-02-24 11:02
> /user/hdfs/.flink/application_1610671284452_0257/WordCount.jar
> -rw-r--r--   3 hdfs supergroup   1602 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_0257-flink-conf.yaml7449295579763306480.tmp
> -rw-r--r--   3 hdfs supergroup  32629 2021-02-24 11:09 
> /user/hdfs/.flink/application_1610671284452_0257/application_1610671284452_02573784840919107496826.tmp
> -rw-r--r--   3 hdfs supergroup  110075001 2021-02-24 11:09
> /user/hdfs/.flink/application_1610671284452_0257/flink-dist_2.11-1.10.1.jar
> 
> 
> 但是报错 Could not find or load main class
> org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint
> 发现上传文件目录的权限是  -rw-r--r-- ,不知道是不是因为权限问题导致
> 
> 
> 希望有人解惑!
> | |
> 凌战
> |
> |

> m18340872285@

> |
> 签名由网易邮箱大师定制





--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Paul Lam
Hi,

Debezium 支持 MongoDB CDC[1],可以了解下。

[1] https://debezium.io/documentation/reference/connectors/mongodb.html

Best,
Paul Lam

> 2021年2月24日 16:23,Evan  写道:
> 
> 
> 有人完整的实现Flink的MongodbSource吗
> 如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化
> 
> 



社区有人实现过Flink的MongodbSource吗?

2021-02-24 文章 Evan

有人完整的实现Flink的MongodbSource吗
如题,现在有一个需求,需要Flink能实时读取MongoDB中数据变化