Flink1.12.0版本,Versioned Tables

2021-02-18 文章 guanyq
求一个Flink1.12.0版本,Versioned Tables的demo。
CREATETABLEproducts(product_idSTRING,product_nameSTRING,priceDECIMAL(32,2),update_timeTIMESTAMP(3)METADATAFROM'value.source.timestamp'VIRTUAL,PRIMARYKEY(product_id)NOTENFORCEDWATERMARKFORupdate_timeASupdate_time)WITH(...);

如何在pyflink中使用全量窗口聚合ProcessWindowFunction

2021-02-18 文章 Hongyuan Ma
向您问好,


我是一名pyflink的新手。我希望能够在Tumble Window中使用processWindowFunction, 
对窗口内数据进行全量计算并最终输出0行或者多行。我查阅了pyflink的datastreamAPI和TableAPI,都没有找到完整的示例。pyflink 
的datastreamAPI目前似乎还没有实现window()。而我对TableAPI的使用方法还不太明确。
假如我使用java实现了“public class MyProcessWindowFunctextends ProcessWindowFunction 
{}”, 打成jar包在pyflink中注册为udf, 
有可能在TableAPI中通过select语句调用它吗, select语句可以正确地返回0行或者多行结果吗?如果能提供一个pyflink简单的 
processWindowFunction的示例, 我将不胜感激!




提前感谢您的帮助!
马宏元

flink on yarn任务的唯一性id问题

2021-02-18 文章 datayangl
目前使用flink1.11进行数据的etl工作,使用snowflake算法生成唯一性id,一个taskmanager有4个slot,etl任务并行度设为16,假设在单机节点上,那么实际运行的任务会运行4个yarn
container,由于同一台机器上的雪花算法有相同的时钟和机器id,因此有机率会出现重复id。请问,1.雪花算法怎么应用到单节点多container的场景且不重复
2.还有什么唯一性id的算法(除了UUID)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-18 文章 flink2021
嗯,我猜测也是,估计是我们kafka某些参数需要调整。大佬可以帮忙看看你们一般的kafka配置是什么样的呢?
JVM :export KAFKA_HEAP_OPTS="-Xmx14G -Xms14G -server -XX:+UseG1GC
-XX:MaxDirectMemorySize=8192m"
其它也就是写常规的配置:
og.segment.bytes=1073741824
log.retention.check.interval.ms=30
#broker能接收消息的最大字节数
message.max.bytes=2
#broker可复制的消息的最大字节数
replica.fetch.max.bytes=204857600
#消费者端的可读取的最大消息
fetch.message.max.bytes=204857600
max.poll.records=500




--
Sent from: http://apache-flink.147419.n8.nabble.com/


任务运行期间hdfs重启,如何自动恢复?

2021-02-18 文章 amenhub
hi everyone,

版本:Flink-1.12.0目
前有kafka->hdfs的flink任务一直运行,但节前遭遇了hdfs重启,发现flink任务直接异常失败,Connection 
refused,而配置的restart strategy似乎没有生效。此外,检查点和保存点都是保存在hdfs上。

请问这种情况只能够等待hdfs重启完成后,手动从hdfs重启前最完整的checkpoint处恢复任务吗?是否有其他自动恢复的可配置操作?

best,
amenhub





flink on k8s日志时间戳时区问题

2021-02-18 文章 casel.chen
目前是UTC时区的,怎样才能设置成当地的东8区呢?谢谢!


2021-02-19 01:34:21,259 INFO  akka.event.slf4j.Slf4jLogger  
   [] - Slf4jLogger started
2021-02-19 01:34:22,155 INFO  akka.remote.Remoting  
   [] - Starting remoting
2021-02-19 01:34:21,259 INFO akka.event.slf4j.Slf4jLogger [] - Slf4jLogger 
started
2021-02-19 01:34:22,155 INFO akka.remote.Remoting [] - Starting remoting

Container is running beyond physical memory limits

2021-02-18 文章 lian
各位大佬好:
1. 背景:使用Flink 
SQL实现回撤流的功能,使用了Last_Value,第二层聚合进行sum求和计算,主要是依靠回撤流的方式,来实现对已经下发的数据进行减法的操作。
  实现的功能和菜鸟如下基本是一致的。
  https://developer.aliyun.com/article/457392
2. 版本及调优:flink 1.10.1,资源单个slot从2-6g,借助了rocksdb的状态后端来存储状态值,对flink的managed 
memory进行了反复的调优,
   调整过overhead内存,最小值设为1g,最大值设为2g
   blockcache大小,由默认值8mb设置为128mb
   block size大小,由默认值4kb设置为32kb
   flush线程数,由默认值1设置为4
   writebuffer,由默认值0.5调整为0.25
   采用了batch size,两阶段聚合参数
   增量checkpoint
   预分配内存设置为false
3. 状态大小: ck在100mb - 25g浮动,savepoint达到了120g
4. 存在的问题:
 1. 内存出现不足,container被kill掉
 2. ck时,状态大的很大,小的很小,不知道这个是什么原因?
5. 看了很多关于内存oom被kill的文章,调整了overhead参数,增大tm的内存来扩大managed 
memory,调整rocksdb的参数等一系列的参数,目前都还是运行一段时间后出现container 被kill。
6. 个人预留的问题:flink 的rocksdb的mertic的参数有添加,但是个人不是很清楚,如何进行打印和监控,如果后续的flink 
的ui界面能够加上对这一块的页面展示,那就会比较好了。
所以,对于到底是哪块内存超了,目前也还不是很清楚。


   看看,有没有大佬能帮忙看一下,这个问题如何优化会比较好?
   看了2020 
flink峰会,唐云大佬对于rocksdb做的相关工作的分享,提到先提高overhead内存可以先解决这个问题,但是我目前还没有解决掉。以及提到用户的使用不当,也会导致这个问题,不知我的场景,是否是不合理的。


  盼复~~~




 

Container is running beyond physical memory limits

2021-02-18 文章 lian
各位大佬好:
1. 背景:使用Flink 
SQL实现回撤流的功能,使用了Last_Value,第二层聚合进行sum求和计算,主要是依靠回撤流的方式,来实现对已经下发的数据进行减法的操作。
  实现的功能和菜鸟如下基本是一致的。
  https://developer.aliyun.com/article/457392
2. 版本及调优:flink 1.10.1,资源单个slot从2-6g,借助了rocksdb的状态后端来存储状态值,对flink的managed 
memory进行了反复的调优,
   调整过overhead内存,最小值设为1g,最大值设为2g
   blockcache大小,由默认值8mb设置为128mb
   block size大小,由默认值4kb设置为32kb
   flush线程数,由默认值1设置为4
   writebuffer,由默认值0.5调整为0.25
   采用了batch size,两阶段聚合参数
   增量checkpoint
   预分配内存设置为false
3. 状态大小: ck在100mb - 25g浮动,savepoint达到了120g
4. 存在的问题:
 1. 内存出现不足,container被kill掉
 2. ck时,状态大的很大,小的很小,不知道这个是什么原因?
5. 看了很多关于内存oom被kill的文章,调整了overhead参数,增大tm的内存来扩大managed 
memory,调整rocksdb的参数等一系列的参数,目前都还是运行一段时间后出现container 被kill。
6. 个人预留的问题:flink 的rocksdb的mertic的参数有添加,但是个人不是很清楚,如何进行打印和监控,如果后续的flink 
的ui界面能够加上对这一块的页面展示,那就会比较好了。
所以,对于到底是哪块内存超了,目前也还不是很清楚。


   看看,有没有大佬能帮忙看一下,这个问题如何优化会比较好?
   看了2020 
flink峰会,唐云大佬对于rocksdb做的相关工作的分享,提到先提高overhead内存可以先解决这个问题,但是我目前还没有解决掉。以及提到用户的使用不当,也会导致这个问题,不知我的场景,是否是不合理的。


  盼复~~~

Re: FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-18 文章 主 Gmail
taskmanager.memory.framework.off-heap.size 默认是一个固定值(256MB 以下),不是按百分比算的
OOM 应该是下游反压导致
建议是直接增加 taskmanager.memory.framework.off-heap.size
On Feb 18, 2021, 16:24 +0800, flink2021 , wrote:
> flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
> 有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
> Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
> out-of-memory error has occurred. This can mean two things:
> either job(s) require(s) a larger size of JVM direct memory or there is a
> direct memory leak.
> The direct memory can be allocated by user code or some of its
> dependencies. In this case 'taskmanager.memory.task.off-heap.size'
> configuration option should be increased.
> Flink framework and its dependencies also consume the direct memory, mostly
> for network communication.
> The most of network memory is managed by Flink and should not result in
> out-of-memory error. In certain special cases,
> in particular for jobs with high parallelism, the framework may require
> more direct memory which is not managed by Flink.
> In this case 'taskmanager.memory.framework.off-heap.size' configuration
> option should be increased.
> If the error persists then there is probably a direct memory leak in user
> code or some of its dependencies which has to be investigated and fixed.
> The task executor has to be shutdown...
> at java.nio.Bits.reserveMemory(Bits.java:695)
> at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
> at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
> at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
> at sun.nio.ch.IOUtil.read(IOUtil.java:195)
> at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
> at
> org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
> at
> org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: Flink实时统计 结果波动时大时小

2021-02-18 文章 LakeShen
Hi flink2021,
  你看下的聚合逻辑是不是在一个可撤回流上面进行聚合的呢,如果是的话,可以添加一下 mini batch 聚合优化参数,具体可以参考[1].
  [1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/tuning/streaming_aggregation_optimization.html

Best,
LakeShen


Robin Zhang  于2021年2月18日周四 下午2:44写道:

> Hi,flink2021
>首先看看业务场景,是否存在订单数据减少的情况,如果没有,就是逻辑或者代码有问题
>
> Best,
> Robin
>
>
> flink2021 wrote
> > 我的数据源是kafka
> >
> 统计订单数据结果写入到mysql,发现在数据有积压的过程中,统计结果会忽大忽小?有人遇到过相关的问题没有呢?需要调整那些设置呢?(数据链路又点复杂,state
> > 使用rockdb报错,没有设置过期时间)
> >
> >
> >
> > --
> > Sent from: http://apache-flink.147419.n8.nabble.com/
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink1.12 不能同时在一个工程消费jdbc和kafka CDC数据

2021-02-18 文章 silence


可以尝试在shade插件里加个transformer




--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink1.12 不能同时在一个工程消费jdbc和kafka CDC数据

2021-02-18 文章 nosstock
用的是flink1.12 quickstart mvn的工程模板,在工程里面一个类从kafka消费CDC数据,一个类从mysql消费CDC数据,如果mvn
pom文件这样配置org.apache.flink 
flink-connector-kafka_${scala.binary.version}  ${flink.version} 
org.apache.flink  flink-connector-jdbc_${scala.binary.version} 
${flink.version} 则kafka消费的应用执行成功,消费mysql的应用报Caused by:
org.apache.flink.table.api.ValidationException: Could not find any factory
for identifier 'jdbc' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.
如果把两个依赖的顺序换一下org.apache.flink 
flink-connector-jdbc_${scala.binary.version}  ${flink.version}  
  
org.apache.flink  flink-connector-kafka_${scala.binary.version} 
${flink.version} 则mysql消费的应用这些成功,消费kafka的应用报Caused by:
org.apache.flink.table.api.ValidationException: Could not find any factory
for identifier 'kafka' that implements
'org.apache.flink.table.factories.DynamicTableFactory' in the classpath.



--
Sent from: http://apache-flink.147419.n8.nabble.com/

回复:FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-18 文章 Shuai Xia
 能看下webUI上的内存使用情况么,看下这三个内存的使用情况
再检查下代码是不是用了Native方法
Framework Off-Heap默认128M
Task Off-Heap默认0
Network默认TM Total的0.1


--
发件人:flink2021 
发送时间:2021年2月18日(星期四) 16:36
收件人:user-zh 
主 题:FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
 Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things: 
 either job(s) require(s) a larger size of JVM direct memory or there is a
direct memory leak. 
 The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased. 
 Flink framework and its dependencies also consume the direct memory, mostly
for network communication. 
 The most of network memory is managed by Flink and should not result in
out-of-memory error. In certain special cases, 
 in particular for jobs with high parallelism, the framework may require
more direct memory which is not managed by Flink. 
 In this case 'taskmanager.memory.framework.off-heap.size' configuration
option should be increased. 
 If the error persists then there is probably a direct memory leak in user
code or some of its dependencies which has to be investigated and fixed. 
 The task executor has to be shutdown...
 at java.nio.Bits.reserveMemory(Bits.java:695)
 at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
 at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
 at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
 at sun.nio.ch.IOUtil.read(IOUtil.java:195)
 at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
 at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
 at
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: flink1.12 on yarn per-job 运行问题

2021-02-18 文章 chaos
问题已解决。
不是资源不足问题,应该是依赖冲突。
解决方式:将所有依赖手动上传至lib下,只打包代码。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

FLINK 消费kafka 报错java.lang.OutOfMemoryError: Direct buffer memory

2021-02-18 文章 flink2021
flink 消费kafka 报错,kafka中的数据目前也不大,10个g左右
有时候几个小时报错,有时候3,5分钟报错,是不是kafka的参数没有设置好呢?目前jvm设置为16G,TM 内存也设置比较高的
 Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct
out-of-memory error has occurred. This can mean two things: 
 either job(s) require(s) a larger size of JVM direct memory or there is a
direct memory leak. 
 The direct memory can be allocated by user code or some of its
dependencies. In this case 'taskmanager.memory.task.off-heap.size'
configuration option should be increased. 
 Flink framework and its dependencies also consume the direct memory, mostly
for network communication. 
 The most of network memory is managed by Flink and should not result in
out-of-memory error. In certain special cases, 
 in particular for jobs with high parallelism, the framework may require
more direct memory which is not managed by Flink. 
 In this case 'taskmanager.memory.framework.off-heap.size' configuration
option should be increased. 
 If the error persists then there is probably a direct memory leak in user
code or some of its dependencies which has to be investigated and fixed. 
 The task executor has to be shutdown...
at java.nio.Bits.reserveMemory(Bits.java:695)
at java.nio.DirectByteBuffer.(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:311)
at sun.nio.ch.Util.getTemporaryDirectBuffer(Util.java:241)
at sun.nio.ch.IOUtil.read(IOUtil.java:195)
at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:378)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.PlaintextTransportLayer.read(PlaintextTransportLayer.java:103)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:117)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:424)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:385)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.attemptRead(Selector.java:651)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:572)
at
org.apache.flink.kafka.shaded.org.apache.kafka.common.network.Selector.poll(Selector.java:483)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:547)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at
org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at
org.apache.flink.streaming.connectors.kafka.internals.KafkaConsumerThread.run(KafkaConsumerThread.java:249)



--
Sent from: http://apache-flink.147419.n8.nabble.com/