Re: StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint
我认为这可能是一个bug (当然也可能是故意这样设计的): 在 EvictingWindowOperator.emitWindowContents()位置: userFunction.process(triggerContext.key, triggerContext.window, processContext, projectedContents, timestampedCollector); 当timestampedCollector的size = 0时; 执行到 ReduceApplyWindowFunction部分: public void apply(K k, W window, Iterable input, Collector out) throws Exception { T curr = null; for (T val: input) { if (curr == null) { curr = val; } else { curr = reduceFunction.reduce(curr, val); } } wrappedFunction.apply(k, window, Collections.singletonList(curr), out); } wrappedFunction.apply(k, window, Collections.singletonList(curr), out);将会产生一个Collections.singletonList(null)结果。 我认为这里应该需要判断一下, 既然input进来是空的,就不应该输出一个null结果 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: flink sql
我做了。。 添加了一个sql语法类似 "select " + "msg," + "count(1) cnt" + " from test" + " where msg = 'hello' " + " group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " + " EMIT \n" + " WITH DELAY '10' SECOND BEFORE WATERMARK,\n" + " WITHOUT DELAY AFTER WATERMARK"; 每10s触发一次窗口计算。 参考阿里云的Emit。 -- Sent from: http://apache-flink.147419.n8.nabble.com/
Re: getCurrentWatermark
currentMaxTimestamp 只是当前数据流里面最大,但不一定是全部的最大。 当数据出现延迟,或者多流的情况下,lastEmittedWatermark 不一定会比 currentMaxTimestamp 小 -- Sent from: http://apache-flink.147419.n8.nabble.com/
fink on yarn per job container 被杀
各位大佬好 rocksDB state 场景下 state 过大 被杀。 有啥好的解决办法? 为啥在 flink 1.10.1 中 taskmanager.memory.managed.size 限制不住 rocksDB 内存申请?改如何控制上线? java.lang.Exception: Container [pid=137231,containerID=container_e118_1611713951789_92045_01_03] is running beyond physical memory limits. Current usage: 4.1 GB of 4 GB physical memory used; 8.3 GB of 8.4 GB virtual memory used. Killing container. Dump of the process-tree for container_e118_1611713951789_92045_01_03 : |- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME( MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE |- 137259 137231 137231 137231 (java) 3935 488 8764928000 1086082 /app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize= 493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError - Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn. YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size= 134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1895154309b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager.rpc.address=cnsz22pl377 -Dweb.port=0 -Dweb.tmpdir=/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc -Djobmanager.rpc.port=18918 -Drest.address=CNSZ22PL377 |- 137231 137229 137231 137231 (bash) 0 0 115855360 356 /bin/bash -c /app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize= 493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError - Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn. YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size= 134217728b -D taskmanager.memory.network.max=359703515b -D taskmanager.memory.network.min=359703515b -D taskmanager.memory.framework.heap.size=134217728b -D taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D taskmanager.memory.task.heap.size=1895154309b -D taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager .rpc.address='cnsz22pl377' -Dweb.port='0' -Dweb.tmpdir= '/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc' -Djobmanager.rpc.port= '18918' -Drest.address='CNSZ22PL377' 1> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.out 2> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.err *state.backend.rocksdb.memory.fixed-per-slot* 1024M *state.backend.rocksdb.memory.managed* true *taskmanager.memory.managed.size* 1024M
pyflink??py4j??????????????????????????java???? ??
pyflink??py4j??java ??
Flink SQL Hive 使用 partition.include 结果跟文档不一致
Flink 1.12.1 streaming-source.partition.includeOption to set the partitions to read, the supported option are `all` and `latest`, the `all` means read all partitions; the `latest` means read latest partition in order of 'streaming-source.partition.order', the `latest` only works` when the streaming hive source table used as temporal table. By default the option is `all`. 报错 Flink SQL> SELECT * FROM exrate_table /*+ OPTIONS('streaming-source.enable'='true','streaming-source.partition.include' = 'latest') */; [ERROR] Could not execute SQL statement. Reason: java.lang.IllegalArgumentException: The only supported 'streaming-source.partition.include' is 'all' in hive table scan, but is 'latest'
Re: Flink SQL Hive 使用 partition.include 结果跟文档不一致
Hi > 在 2021年2月5日,09:47,macia kk 写道: > > the `latest` only works` when the > streaming hive source table used as temporal table. 只能用在temporal(时态)表中,时态表只能在 temporal join(也就是我们常说的维表join) 中使用 祝好
Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?
Hi, 你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1] [1] https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions Best, Xingbo 瞿叶奇 <389243...@qq.com> 于2021年2月4日周四 下午5:53写道: > 请问如何实现pyflink的py4j调用我自己写的java程序 ?
?????? pyflink??py4j??????????????????????????java???? ??
java jvmkerberos??pyflink?? -- -- ??: "user-zh" https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions Best, Xingbo ?? <389243...@qq.com> ??2021??2??4?? 5:53?? > pyflink??py4j??java ??
????????????????????????????????????????
super.getRuntimeContext().getAttemptNumber()??0??1 | | ?? | | ??xiongyun...@163.com | ?? ??2021??02??04?? 11:42??op ?? -- -- ??: "user-zh"
Re: 如何在程序里面判断作业是否是重启了
目前想到的是加一个调度器插件,在重启事件那边 hook 一下。 正常的重启流程貌似没有其他 hook 点了,抄送一下这方面的专家(in cc)看看有没有其他意见。 Best, tison. 熊云昆 于2021年2月5日周五 上午11:30写道: > > super.getRuntimeContext().getAttemptNumber()试试这个方法获取重启次数试试,如果没有重启过是0,反之每重启一次就会加1 > > > | | > 熊云昆 > | > | > 邮箱:xiongyun...@163.com > | > > 签名由 网易邮箱大师 定制 > > 在2021年02月04日 11:42,op 写道: > 你好,我们下游不是所有需求都会去重,开销有点大。。。 > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > zapj...@163.com>; > 发送时间: 2021年2月4日(星期四) 中午11:31 > 收件人: "user-zh" > 主题: Re:回复: 如何在程序里面判断作业是否是重启了 > > > > > > > 下游数据做好幂等操作,就不怕重复操作了。。 > > > > > > > > > > > > > > > 在 2021-02-04 11:26:56,"op" <520075...@qq.com> 写道: > >重启可能会导致数据重发,想加个告警 > > > > > > > > > >-- 原始邮件 -- > >发件人: > "user-zh" > >发送时间: 2021年2月4日(星期四) 中午11:11 > >收件人: "user-zh" > > >主题: Re: 如何在程序里面判断作业是否是重启了 > > > > > > > >业务上的需求是什么? > > > >Best, > >tison. > > > > > >op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道: > > > >> 大家好: > >> > >> > 我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?
??????fink on yarn per job container ????
Hi ??jvm-overheadhttps://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&;idx=1&sn=b0893a9bf12fbcae76852a156302de95 -- -- ??: "user-zh"
Re: 如何在程序里面判断作业是否是重启了
RuntimeContext 有 getAttemptNumber() 接口,可以看出任务是第几次重跑了。 但是一般来说,我们都是通过外部系统监控 Flink 作业的 numRestarts metric 来判断作业是不是发生了 failover,进行报警。 Thanks, Zhu tison 于2021年2月5日周五 下午12:10写道: > 目前想到的是加一个调度器插件,在重启事件那边 hook 一下。 > > 正常的重启流程貌似没有其他 hook 点了,抄送一下这方面的专家(in cc)看看有没有其他意见。 > > Best, > tison. > > > 熊云昆 于2021年2月5日周五 上午11:30写道: > >> >> super.getRuntimeContext().getAttemptNumber()试试这个方法获取重启次数试试,如果没有重启过是0,反之每重启一次就会加1 >> >> >> | | >> 熊云昆 >> | >> | >> 邮箱:xiongyun...@163.com >> | >> >> 签名由 网易邮箱大师 定制 >> >> 在2021年02月04日 11:42,op 写道: >> 你好,我们下游不是所有需求都会去重,开销有点大。。。 >> >> >> >> >> -- 原始邮件 -- >> 发件人: >> "user-zh" >> < >> zapj...@163.com>; >> 发送时间: 2021年2月4日(星期四) 中午11:31 >> 收件人: "user-zh"> >> 主题: Re:回复: 如何在程序里面判断作业是否是重启了 >> >> >> >> >> >> >> 下游数据做好幂等操作,就不怕重复操作了。。 >> >> >> >> >> >> >> >> >> >> >> >> >> >> >> 在 2021-02-04 11:26:56,"op" <520075...@qq.com> 写道: >> >重启可能会导致数据重发,想加个告警 >> > >> > >> > >> > >> >-- 原始邮件 -- >> >发件人: >> "user-zh" >> > >发送时间: 2021年2月4日(星期四) 中午11:11 >> >收件人: "user-zh"> > >> >主题: Re: 如何在程序里面判断作业是否是重启了 >> > >> > >> > >> >业务上的需求是什么? >> > >> >Best, >> >tison. >> > >> > >> >op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道: >> > >> >> 大家好: >> >> >> >> >> 我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢? > >
Re: fink on yarn per job container 被杀
谢谢 回答. 是指的这个参数 taskmanager.memory.jvm-overhead 调整吗(微信连接有点问题)? 我看邮件列表很多大佬的回答基本上都是要调大堆外内存。 难道 rocksdb 就申请内存的时候就控制不住上限吗?我的state 是会一直增长,但是我希望rocksDB 内存能在我设置的范围内,不至于被 yarn kill . 这块要能实现吗? zhiyezou <1530130...@qq.com> 于2021年2月5日周五 下午1:25写道: > Hi > 可以先jvm-overhead相关配置,具体原理及参数请参考这篇文章, > https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&;idx=1&sn=b0893a9bf12fbcae76852a156302de95 > > > > > -- 原始邮件 -- > 发件人: > "user-zh" > < > louke...@gmail.com>; > 发送时间: 2021年2月4日(星期四) 下午5:46 > 收件人: "user-zh" > 主题: fink on yarn per job container 被杀 > > > > 各位大佬好 > rocksDB state 场景下 state 过大 被杀。 有啥好的解决办法? 为啥在 flink 1.10.1 中 > taskmanager.memory.managed.size 限制不住 rocksDB 内存申请?改如何控制上线? > java.lang.Exception: Container > [pid=137231,containerID=container_e118_1611713951789_92045_01_03] > is running beyond physical memory limits. Current usage: 4.1 GB of 4 GB > physical memory used; 8.3 GB of 8.4 GB virtual memory used. Killing > container. > Dump of the process-tree for container_e118_1611713951789_92045_01_03 : > |- PID PPID PGRPID SESSID CMD_NAME > USER_MODE_TIME(MILLIS) SYSTEM_TIME( > MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE > |- 137259 137231 137231 137231 (java) 3935 488 > 8764928000 1086082 > /app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize= > 493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError - > > Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn. > YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size= > 134217728b -D taskmanager.memory.network.max=359703515b -D > taskmanager.memory.network.min=359703515b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D > taskmanager.memory.task.heap.size=1895154309b -D > taskmanager.memory.task.off-heap.size=0b --configDir . > -Djobmanager.rpc.address=cnsz22pl377 > -Dweb.port=0 > -Dweb.tmpdir=/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc > -Djobmanager.rpc.port=18918 -Drest.address=CNSZ22PL377 > |- 137231 137229 137231 137231 (bash) 0 0 115855360 356 > /bin/bash -c > /app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize= > 493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError - > > Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log > -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn. > YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size= > 134217728b -D taskmanager.memory.network.max=359703515b -D > taskmanager.memory.network.min=359703515b -D > taskmanager.memory.framework.heap.size=134217728b -D > taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D > taskmanager.memory.task.heap.size=1895154309b -D > taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager > .rpc.address='cnsz22pl377' -Dweb.port='0' -Dweb.tmpdir= > '/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc' > -Djobmanager.rpc.port= > '18918' -Drest.address='CNSZ22PL377' 1> > > /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.out > 2> > /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.err > > > *state.backend.rocksdb.memory.fixed-per-slot* 1024M > *state.backend.rocksdb.memory.managed* true > *taskmanager.memory.managed.size* 1024M