Re: StreamAPi Window 在使用 .evictor(TimeEvictor.of(Time.seconds(0))).sum().print 报NullPoint

2021-02-04 Thread HunterXHunter
我认为这可能是一个bug (当然也可能是故意这样设计的):
在 EvictingWindowOperator.emitWindowContents()位置:
userFunction.process(triggerContext.key, triggerContext.window,
processContext, projectedContents, timestampedCollector);
当timestampedCollector的size = 0时;
执行到 ReduceApplyWindowFunction部分:
public void apply(K k, W window, Iterable input, Collector out) throws
Exception {
T curr = null;
for (T val: input) {
if (curr == null) {
curr = val;
} else {
curr = reduceFunction.reduce(curr, val);
}
}
wrappedFunction.apply(k, window, 
Collections.singletonList(curr), out);
}

wrappedFunction.apply(k, window, Collections.singletonList(curr),
out);将会产生一个Collections.singletonList(null)结果。
我认为这里应该需要判断一下, 既然input进来是空的,就不应该输出一个null结果




--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: flink sql

2021-02-04 Thread HunterXHunter
我做了。。
添加了一个sql语法类似

"select " +
"msg," +
"count(1) cnt" +
" from test" +
" where msg = 'hello' " +
" group by TUMBLE(rowtime, INTERVAL '30' SECOND), msg " +
" EMIT \n" +
"  WITH DELAY '10' SECOND BEFORE WATERMARK,\n" +
"  WITHOUT DELAY AFTER WATERMARK";

每10s触发一次窗口计算。
参考阿里云的Emit。



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: getCurrentWatermark

2021-02-04 Thread HunterXHunter
currentMaxTimestamp 只是当前数据流里面最大,但不一定是全部的最大。
当数据出现延迟,或者多流的情况下,lastEmittedWatermark 不一定会比 currentMaxTimestamp 小



--
Sent from: http://apache-flink.147419.n8.nabble.com/

fink on yarn per job container 被杀

2021-02-04 Thread key lou
各位大佬好
 rocksDB state 场景下 state 过大 被杀。 有啥好的解决办法? 为啥在 flink 1.10.1 中
taskmanager.memory.managed.size  限制不住 rocksDB 内存申请?改如何控制上线?
java.lang.Exception: Container
[pid=137231,containerID=container_e118_1611713951789_92045_01_03]
is running beyond physical memory limits. Current usage: 4.1 GB of 4 GB
physical memory used; 8.3 GB of 8.4 GB virtual memory used. Killing
container.
Dump of the process-tree for container_e118_1611713951789_92045_01_03 :
|- PID PPID PGRPID SESSID CMD_NAME USER_MODE_TIME(MILLIS) SYSTEM_TIME(
MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
|- 137259 137231 137231 137231 (java) 3935 488 8764928000 1086082
/app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize=
493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError -
Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
134217728b -D taskmanager.memory.network.max=359703515b -D
taskmanager.memory.network.min=359703515b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=1895154309b -D
taskmanager.memory.task.off-heap.size=0b --configDir .
-Djobmanager.rpc.address=cnsz22pl377
-Dweb.port=0 -Dweb.tmpdir=/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc
-Djobmanager.rpc.port=18918 -Drest.address=CNSZ22PL377
|- 137231 137229 137231 137231 (bash) 0 0 115855360 356 /bin/bash -c
/app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize=
493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError -
Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
-Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
134217728b -D taskmanager.memory.network.max=359703515b -D
taskmanager.memory.network.min=359703515b -D
taskmanager.memory.framework.heap.size=134217728b -D
taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D
taskmanager.memory.task.heap.size=1895154309b -D
taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager
.rpc.address='cnsz22pl377' -Dweb.port='0' -Dweb.tmpdir=
'/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc' -Djobmanager.rpc.port=
'18918' -Drest.address='CNSZ22PL377' 1>
/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.out
2> 
/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.err


*state.backend.rocksdb.memory.fixed-per-slot* 1024M
*state.backend.rocksdb.memory.managed* true
*taskmanager.memory.managed.size* 1024M


pyflink??py4j??????????????????????????java???? ??

2021-02-04 Thread ??????
pyflink??py4j??java ??

Flink SQL Hive 使用 partition.include 结果跟文档不一致

2021-02-04 Thread macia kk
Flink 1.12.1

streaming-source.partition.includeOption to set the partitions to read, the
supported option are `all` and `latest`, the `all` means read all
partitions; the `latest` means read latest partition in order of
'streaming-source.partition.order', the `latest` only works` when the
streaming hive source table used as temporal table. By default the option
is `all`.


报错
Flink SQL> SELECT * FROM exrate_table /*+
OPTIONS('streaming-source.enable'='true','streaming-source.partition.include'
= 'latest') */; [ERROR] Could not execute SQL statement. Reason:
java.lang.IllegalArgumentException: The only supported
'streaming-source.partition.include' is 'all' in hive table scan, but is
'latest'


Re: Flink SQL Hive 使用 partition.include 结果跟文档不一致

2021-02-04 Thread Leonard Xu
Hi

> 在 2021年2月5日,09:47,macia kk  写道:
> 
> the `latest` only works` when the
> streaming hive source table used as temporal table. 

只能用在temporal(时态)表中,时态表只能在 temporal join(也就是我们常说的维表join) 中使用

祝好

Re: pyflink的py4j里是不是可以调用我自己写的java程序 ?

2021-02-04 Thread Xingbo Huang
Hi,

你是想使用java写的udfs吗,你可以调用register_java_function或者create_java_temporary_function来注册你用java写的udfs,具体可以参考文档[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions

Best,
Xingbo


瞿叶奇 <389243...@qq.com> 于2021年2月4日周四 下午5:53写道:

> 请问如何实现pyflink的py4j调用我自己写的java程序 ?


?????? pyflink??py4j??????????????????????????java???? ??

2021-02-04 Thread ??????
java 
jvmkerberos??pyflink??




--  --
??: 
   "user-zh"

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/python/table-api-users-guide/udfs/python_udfs.html#scalar-functions

Best,
Xingbo


?? <389243...@qq.com> ??2021??2??4?? 5:53??

> pyflink??py4j??java ??

????????????????????????????????????????

2021-02-04 Thread ??????
super.getRuntimeContext().getAttemptNumber()??0??1


| |
??
|
|
??xiongyun...@163.com
|

??  

??2021??02??04?? 11:42??op ??





--  --
??: 
   "user-zh"



Re: 如何在程序里面判断作业是否是重启了

2021-02-04 Thread tison
目前想到的是加一个调度器插件,在重启事件那边 hook 一下。

正常的重启流程貌似没有其他 hook 点了,抄送一下这方面的专家(in cc)看看有没有其他意见。

Best,
tison.


熊云昆  于2021年2月5日周五 上午11:30写道:

>
> super.getRuntimeContext().getAttemptNumber()试试这个方法获取重启次数试试,如果没有重启过是0,反之每重启一次就会加1
>
>
> | |
> 熊云昆
> |
> |
> 邮箱:xiongyun...@163.com
> |
>
> 签名由 网易邮箱大师 定制
>
> 在2021年02月04日 11:42,op 写道:
> 你好,我们下游不是所有需求都会去重,开销有点大。。。
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> zapj...@163.com>;
> 发送时间: 2021年2月4日(星期四) 中午11:31
> 收件人: "user-zh"
> 主题: Re:回复: 如何在程序里面判断作业是否是重启了
>
>
>
>
>
>
> 下游数据做好幂等操作,就不怕重复操作了。。
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> 在 2021-02-04 11:26:56,"op" <520075...@qq.com> 写道:
> >重启可能会导致数据重发,想加个告警
> >
> >
> >
> >
> >-- 原始邮件 --
> >发件人:   
> "user-zh"   
>  >发送时间: 2021年2月4日(星期四) 中午11:11
> >收件人: "user-zh" >
> >主题: Re: 如何在程序里面判断作业是否是重启了
> >
> >
> >
> >业务上的需求是什么?
> >
> >Best,
> >tison.
> >
> >
> >op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道:
> >
> >> 大家好:
> >> &nbsp;
> >>
> &nbsp;我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?


??????fink on yarn per job container ????

2021-02-04 Thread zhiyezou
Hi
  
 ??jvm-overheadhttps://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&;idx=1&sn=b0893a9bf12fbcae76852a156302de95




--  --
??: 
   "user-zh"



Re: 如何在程序里面判断作业是否是重启了

2021-02-04 Thread Zhu Zhu
RuntimeContext 有 getAttemptNumber() 接口,可以看出任务是第几次重跑了。
但是一般来说,我们都是通过外部系统监控 Flink 作业的 numRestarts metric 来判断作业是不是发生了 failover,进行报警。

Thanks,
Zhu

tison  于2021年2月5日周五 下午12:10写道:

> 目前想到的是加一个调度器插件,在重启事件那边 hook 一下。
>
> 正常的重启流程貌似没有其他 hook 点了,抄送一下这方面的专家(in cc)看看有没有其他意见。
>
> Best,
> tison.
>
>
> 熊云昆  于2021年2月5日周五 上午11:30写道:
>
>>
>> super.getRuntimeContext().getAttemptNumber()试试这个方法获取重启次数试试,如果没有重启过是0,反之每重启一次就会加1
>>
>>
>> | |
>> 熊云昆
>> |
>> |
>> 邮箱:xiongyun...@163.com
>> |
>>
>> 签名由 网易邮箱大师 定制
>>
>> 在2021年02月04日 11:42,op 写道:
>> 你好,我们下游不是所有需求都会去重,开销有点大。。。
>>
>>
>>
>>
>> -- 原始邮件 --
>> 发件人:
>>   "user-zh"
>> <
>> zapj...@163.com>;
>> 发送时间: 2021年2月4日(星期四) 中午11:31
>> 收件人: "user-zh">
>> 主题: Re:回复: 如何在程序里面判断作业是否是重启了
>>
>>
>>
>>
>>
>>
>> 下游数据做好幂等操作,就不怕重复操作了。。
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2021-02-04 11:26:56,"op" <520075...@qq.com> 写道:
>> >重启可能会导致数据重发,想加个告警
>> >
>> >
>> >
>> >
>> >-- 原始邮件 --
>> >发件人:   
>> "user-zh"   
>> > >发送时间: 2021年2月4日(星期四) 中午11:11
>> >收件人: "user-zh"> >
>> >主题: Re: 如何在程序里面判断作业是否是重启了
>> >
>> >
>> >
>> >业务上的需求是什么?
>> >
>> >Best,
>> >tison.
>> >
>> >
>> >op <520075...@qq.com> 于2021年2月4日周四 上午11:04写道:
>> >
>> >> 大家好:
>> >> &nbsp;
>> >>
>> &nbsp;我在程序里通过RestartStrategies设置了重启策略,现在想在算子里面判断是否是触发了Restart,请问有哪些方法能实现呢?
>
>


Re: fink on yarn per job container 被杀

2021-02-04 Thread key lou
谢谢 回答.
是指的这个参数 taskmanager.memory.jvm-overhead 调整吗(微信连接有点问题)?
我看邮件列表很多大佬的回答基本上都是要调大堆外内存。
难道 rocksdb 就申请内存的时候就控制不住上限吗?我的state 是会一直增长,但是我希望rocksDB 内存能在我设置的范围内,不至于被
yarn  kill . 这块要能实现吗?

zhiyezou <1530130...@qq.com> 于2021年2月5日周五 下午1:25写道:

> Hi
>    可以先jvm-overhead相关配置,具体原理及参数请参考这篇文章,
> https://mp.weixin.qq.com/s?__biz=MzU3Mzg4OTMyNQ==&mid=2247490197&;idx=1&sn=b0893a9bf12fbcae76852a156302de95
>
>
>
>
> -- 原始邮件 --
> 发件人:
>   "user-zh"
> <
> louke...@gmail.com>;
> 发送时间: 2021年2月4日(星期四) 下午5:46
> 收件人: "user-zh"
> 主题: fink on yarn per job container 被杀
>
>
>
> 各位大佬好
>  rocksDB state 场景下 state 过大 被杀。 有啥好的解决办法? 为啥在 flink 1.10.1 中
> taskmanager.memory.managed.size  限制不住 rocksDB 内存申请?改如何控制上线?
> java.lang.Exception: Container
> [pid=137231,containerID=container_e118_1611713951789_92045_01_03]
> is running beyond physical memory limits. Current usage: 4.1 GB of 4 GB
> physical memory used; 8.3 GB of 8.4 GB virtual memory used. Killing
> container.
> Dump of the process-tree for container_e118_1611713951789_92045_01_03 :
>     |- PID PPID PGRPID SESSID CMD_NAME
> USER_MODE_TIME(MILLIS) SYSTEM_TIME(
> MILLIS) VMEM_USAGE(BYTES) RSSMEM_USAGE(PAGES) FULL_CMD_LINE
>     |- 137259 137231 137231 137231 (java) 3935 488
> 8764928000 1086082
> /app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize=
> 493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError -
>
> Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
> YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
> 134217728b -D taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1895154309b -D
> taskmanager.memory.task.off-heap.size=0b --configDir .
> -Djobmanager.rpc.address=cnsz22pl377
> -Dweb.port=0
> -Dweb.tmpdir=/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc
> -Djobmanager.rpc.port=18918 -Drest.address=CNSZ22PL377
>     |- 137231 137229 137231 137231 (bash) 0 0 115855360 356
> /bin/bash -c
> /app/jdk/bin/java -Xmx2029372037 -Xms2029372037 -XX:MaxDirectMemorySize=
> 493921243 -XX:MaxMetaspaceSize=268435456 -XX:+HeapDumpOnOutOfMemoryError -
>
> Dlog.file=/HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.log
> -Dlog4j.configuration=file:./log4j.properties org.apache.flink.yarn.
> YarnTaskExecutorRunner -D taskmanager.memory.framework.off-heap.size=
> 134217728b -D taskmanager.memory.network.max=359703515b -D
> taskmanager.memory.network.min=359703515b -D
> taskmanager.memory.framework.heap.size=134217728b -D
> taskmanager.memory.managed.size=1073741824b -D taskmanager.cpu.cores=1.0 -D
> taskmanager.memory.task.heap.size=1895154309b -D
> taskmanager.memory.task.off-heap.size=0b --configDir . -Djobmanager
> .rpc.address='cnsz22pl377' -Dweb.port='0' -Dweb.tmpdir=
> '/tmp/flink-web-8a1097fe-7fbc-4a8b-ad06-8701ba8262bc'
> -Djobmanager.rpc.port=
> '18918' -Drest.address='CNSZ22PL377' 1>
>
> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.out
> 2>
> /HDATA/4/yarn/logs/application_1611713951789_92045/container_e118_1611713951789_92045_01_03/taskmanager.err
>
>
> *state.backend.rocksdb.memory.fixed-per-slot* 1024M
> *state.backend.rocksdb.memory.managed* true
> *taskmanager.memory.managed.size* 1024M