Re:回复: 关于Flink SQL DISTINCT问题

2019-09-04 Thread lvwenyuan
对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式
在 2019-09-04 14:38:29,"athlon...@gmail.com"  写道:
>在窗口内去重吧,不可能无限保留去重数据的
>
>
>
>athlon...@gmail.com
> 
>发件人: lvwenyuan
>发送时间: 2019-09-04 14:28
>收件人: user-zh
>主题: 关于Flink SQL DISTINCT问题
>各位大佬好:
>   我想问下,关于flink sql的实时去重,就是count(distinct user_id) 
> 。就是Flink内部是如何做到实时去重,如果对于数据量比较大的时候实时去重,是否会有性能问题。用的Blink Planner


Re: Streaming File Sink疑问

2019-09-04 Thread wang jinhai
这个问题我们刚好解决了。在flink任务的resources里加上hdfs-site.xml文件,把集群B的HA相关的配置复制进来,同时把dfs.nameservices的value设定为A,B,关键是加上true。不然被集群的HADOOP_CONF_DIR配置覆盖(代码逻辑在HadoopUtils类)


在 2019/9/4 上午11:47,“周美娜”<15957136...@163.com> 写入:

我的做法是 重新配置 HADOOP_CONF_DIR 环境变量:在flink集群里面配置 core-site.xml 和 
hdfs-site.xml,同时将  HADOOP_CONF_DIR 环境变量 指向这个文件目录
> 在 2019年9月4日,上午11:16,戴嘉诚  写道:
> 
> 大家好:
>   我在看到streamingFileSink 
中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢
> 




Re: Streaming File Sink疑问

2019-09-04 Thread wang jinhai
简单粗暴的方案是修改集群的HADOOP_CONF_DIR,集成A、B的hdfs配置,但是不太友好,如果多个集群,每次都得修改,比较费劲。好的方式就是在flink任务里添加hdfs-site.xml,同时注意dfs.nameservices的设置

在 2019/9/4 上午11:47,“周美娜”<15957136...@163.com> 写入:

我的做法是 重新配置 HADOOP_CONF_DIR 环境变量:在flink集群里面配置 core-site.xml 和 
hdfs-site.xml,同时将  HADOOP_CONF_DIR 环境变量 指向这个文件目录
> 在 2019年9月4日,上午11:16,戴嘉诚  写道:
> 
> 大家好:
>   我在看到streamingFileSink 
中看到可以把数据转成文件写入到Flink的fileSystem中,但是这里有个问题就是,我写入的hdfs不是在flink的集群(集群A)上,是在另一个hdfs集群(集群B)上,然后那个集群(集群B)上面配置了namenode的HA,如果只是直接指定namenode感觉不怎么可靠,但是flink默认的flinkSystem中,是指定了flink默认的hdfs集群(集群A),请问这个连接器中,能单独指定fs的配置吗?因为flink开启了checkpoint,而checkpoint的默认路径是在fink自身的集群上(集群A),所以不能粗暴的直接把默认的fileSystem直接指向集群B。谢谢
> 




Re: 回复: 关于Flink SQL DISTINCT问题

2019-09-04 Thread JingsongLee
一般是按时间(比如天)来group by,state配置了超时过期的时间。
基本的去重方式就是靠state(比如RocksDbState)。
有mini-batch来减少对state的访问。

如果有倾斜,那是解倾斜问题的话题了。

Best,
Jingsong Lee


--
From:lvwenyuan 
Send Time:2019年9月4日(星期三) 15:11
To:user-zh 
Subject:Re:回复: 关于Flink SQL DISTINCT问题

对,肯定是按照窗口去重的。我就想问下,窗口去重时,所采用的方式
在 2019-09-04 14:38:29,"athlon...@gmail.com"  写道:
>在窗口内去重吧,不可能无限保留去重数据的
>
>
>
>athlon...@gmail.com
> 
>发件人: lvwenyuan
>发送时间: 2019-09-04 14:28
>收件人: user-zh
>主题: 关于Flink SQL DISTINCT问题
>各位大佬好:
>   我想问下,关于flink sql的实时去重,就是count(distinct user_id) 
> 。就是Flink内部是如何做到实时去重,如果对于数据量比较大的时候实时去重,是否会有性能问题。用的Blink Planner


回复: 如何使用forward方式向kafka中生产数据

2019-09-04 Thread gaofeilong198...@163.com

有大佬确认过吗


--
高飞龙
手机 +86 18710107193
gaofeilong198...@163.com
 
发件人: gaofeilong198...@163.com
发送时间: 2019-09-02 22:56
收件人: user-zh
主题: 如何使用forward方式向kafka中生产数据
我的kafka有10个分区,现在我希望使用flink程序的forward的方式而不是rebalance的方式向kafka中生产数据,那么是应该用以下哪种方式呢?
ds.map(line => someFunction).setParallelism(10).addSink(myKafkaProducer)
or
ds.map(line => someFunction).addSink(myKafkaProducer).setParallelism(10)



--
高飞龙

gaofeilong198...@163.com


如何优化flink内存?

2019-09-04 Thread Yifei Qi
大家好:



不知道大家在使用flink时遇到过内存消耗过大的问题么?



我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?



具体情况是这样的:

准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.

按照用户进行分组.

计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.





flink运行在3个节点后, 内存合计就用了5G.





flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.





顺祝商祺


-- 


Qi Yifei
[image: https://]about.me/qyf404



Re: 如何优化flink内存?

2019-09-04 Thread Victor Wong
这种情况不建议使用滑动窗口,因为会保存大量的窗口数据(24小时/1分钟);
可以自定义ProcessFunction,参照[1];

[1]. 
https://stackoverflow.com/questions/51977741/flink-performance-issue-with-sliding-time-window





On 04/09/2019, 8:07 PM, "Yifei Qi"  wrote:

>大家好:
>
>
>
>不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
>我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
>具体情况是这样的:
>
>准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
>按照用户进行分组.
>
>计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
>flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
>flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
>顺祝商祺
>
>
>-- 
>
>
>Qi Yifei
>[image: https://]about.me/qyf404
>


Re: 如何优化flink内存?

2019-09-04 Thread Shuo Cheng
如果是使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
份,像你这种大小设置,肯定会导致内存的大量消耗.

On Wed, Sep 4, 2019 at 8:07 PM Yifei Qi  wrote:

> 大家好:
>
>
>
> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
> 具体情况是这样的:
>
> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
> 按照用户进行分组.
>
> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
> flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
> flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
> 顺祝商祺
>
>
> --
>
>
> Qi Yifei
> [image: https://]about.me/qyf404
> <
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> >
>


Re: 如何优化flink内存?

2019-09-04 Thread 戴嘉诚
这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存

Yifei Qi 于2019年9月4日 周三20:07写道:

> 大家好:
>
>
>
> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
> 具体情况是这样的:
>
> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
> 按照用户进行分组.
>
> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
> flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
> flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
> 顺祝商祺
>
>
> --
>
>
> Qi Yifei
> [image: https://]about.me/qyf404
> <
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> >
>


Re:Re: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错

2019-09-04 Thread Michael Ran
yarn 客户端 没装,或者配置不对吧
在 2019-09-02 02:37:59,"Yun Tang"  写道:
>Hi
>
>向0.0.0.0:8030 尝试提交作业是因为提交作业时找不到正确的YARN配置,就会向默认的本地8030端口提交,检查一下HADOOP_CONF_DIR 
>或者 HADOOP_HOME 这些环境变量有没有设置正确。可以设置一下这些配置文件的目录地址就可以提交作业了。
>
>BTW,这个不是一个Flink的问题,是所有使用YARN管理作业的大数据计算引擎都有可能遇到的问题。
>
>祝好
>唐云
>
>From: 周虓岗 
>Sent: Sunday, September 1, 2019 15:31
>To: user-zh@flink.apache.org 
>Subject: Re:使用flink 1.8.1 部署yarn集群 , 始终有报错
>
>
>比较肯定yarn的配置基本是正确的,不知道为何flink始终在通过0.0.0.0 连接yarn scheduler
>
>
>
>
>
>
>
>在 2019-09-01 13:55:50,"周虓岗"  写道:
>>Retrying connect to server: 0.0.0.0/0.0.0.0:8030. Already tried 0 time(s); 
>>retry policy is RetryUpToMaximumCountWithFixedSleep
>>
>>
>>
>>
>>同样的一台电脑使用1.7.2部署就没有问题,有没有大神帮忙看看哪里有问题


FLink WEB ????????????????

2019-09-04 Thread wanghongquan.sh
FLink WEB ??WEB

Re: FLink WEB 怎么加登录验证?

2019-09-04 Thread Wesley Peng

Hi

on 2019/9/5 10:46, wanghongquan.sh wrote:

FLink WEB 控制台中,没有找到登录验证的配置,请问这个WEB怎么加登录验证?


Flink does not directly support authenticating access to the web UI, but 
you can always put something like Apache's basic_auth in front of it.




Re: 如何优化flink内存?

2019-09-04 Thread 郑 仲尼
大家好,

我对这种大窗口的滑动窗口有一个思路,我主要使用的是Flink SQL,方法不知道对不对,大家讨论下哈:

1.将24小时窗口,1分钟的滑动,转化为1分钟的滚动窗口,将结果输出,然后输出给24小时滑动窗口1分钟输出。

2.直接的滑动窗口,好像是需要将所有的明细数据都保存在内存中。(这一点不知对不对,大神们帮忙看看)

3.使用滚动窗口,进行预聚合,滚动窗口的数据在窗口输出后,不用保存明细数据,减少了内存使用。

4.如果TPS过高,可以减少滚动窗口的大小,但是需要保证输出到的滑动窗口,一次滑动的窗口大小是滚动窗口的N倍。

祝
顺利
———
Johnny.Z

 原始邮件
发件人: 戴嘉诚
收件人: user-zh
发送时间: 2019年9月4日(周三) 22:51
主题: Re: 如何优化flink内存?


这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存

Yifei Qi mailto:qyf...@gmail.com>>于2019年9月4日 周三20:07写道:

> 大家好:
>
>
>
> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>
>
>
> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>
>
>
> 具体情况是这样的:
>
> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>
> 按照用户进行分组.
>
> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>
>
>
>
>
> flink运行在3个节点后, 内存合计就用了5G.
>
>
>
>
>
> flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>
>
>
>
>
> 顺祝商祺
>
>
> --
>
>
> Qi Yifei
> [image: https://]about.me/qyf404
> <
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> >
>



Re:Re: 如何优化flink内存?

2019-09-04 Thread hb



你这个方法是有限制的吧,对于窗口内去重是不支持的,对于取max,min等应该是可以这样做的.





在 2019-09-05 09:50:51,"郑 仲尼"  写道:
>大家好,
>
>我对这种大窗口的滑动窗口有一个思路,我主要使用的是Flink SQL,方法不知道对不对,大家讨论下哈:
>
>1.将24小时窗口,1分钟的滑动,转化为1分钟的滚动窗口,将结果输出,然后输出给24小时滑动窗口1分钟输出。
>
>2.直接的滑动窗口,好像是需要将所有的明细数据都保存在内存中。(这一点不知对不对,大神们帮忙看看)
>
>3.使用滚动窗口,进行预聚合,滚动窗口的数据在窗口输出后,不用保存明细数据,减少了内存使用。
>
>4.如果TPS过高,可以减少滚动窗口的大小,但是需要保证输出到的滑动窗口,一次滑动的窗口大小是滚动窗口的N倍。
>
>祝
>顺利
>———
>Johnny.Z
>
> 原始邮件
>发件人: 戴嘉诚
>收件人: user-zh
>发送时间: 2019年9月4日(周三) 22:51
>主题: Re: 如何优化flink内存?
>
>
>这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>
>Yifei Qi mailto:qyf...@gmail.com>>于2019年9月4日 周三20:07写道:
>
>> 大家好:
>>
>>
>>
>> 不知道大家在使用flink时遇到过内存消耗过大的问题么?
>>
>>
>>
>> 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
>>
>>
>>
>> 具体情况是这样的:
>>
>> 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
>>
>> 按照用户进行分组.
>>
>> 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
>>
>>
>>
>>
>>
>> flink运行在3个节点后, 内存合计就用了5G.
>>
>>
>>
>>
>>
>> flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
>>
>>
>>
>>
>>
>> 顺祝商祺
>>
>>
>> --
>>
>>
>> Qi Yifei
>> [image: https://]about.me/qyf404
>> <
>> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
>> >
>>
>


flink1.9.0对DDL的支持

2019-09-04 Thread pengcheng...@bonc.com.cn
各位大佬:

请教一下, 1.flink1.9.0的table API/sql是不是还没有支持Create view?
   2.BlinkPlanner和flink的planner有什么区别?




pengcheng...@bonc.com.cn


flink1.9??blinkSQL??????udf??TIMESTAMP????????

2019-09-04 Thread ????
??


??flink1.9flinkSQLudfblinkSQLudfTIMESTAMP??udf??+8??


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
        at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
        at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at 
java.security.AccessController.doPrivileged(Native Method)
        at 
javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
        at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
        at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
        at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

请教一下flink sql的问题

2019-09-04 Thread sllence
普通的etl sql是否可以使用窗口
类似于这样:
select * from table_name
where xxx_time =  TUMBLE(rowtime, INTERVAL '1' MINUTE)
定时触发一段sql逻辑的执行
看了下官网只有group by的window,这种没有group by的window是否可以这么定义?
可以的话能否给个demo样例
谢谢各位

Re: flink1.9.0对DDL的支持

2019-09-04 Thread Wesley Peng

Hi

on 2019/9/5 11:23, pengcheng...@bonc.com.cn wrote:

请教一下, 1.flink1.9.0的table API/sql是不是还没有支持Create view?


from the official documentation of flink 1.9:

Views can also be created within a CLI session using the CREATE VIEW 
statement:


CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;

Views created within a CLI session can also be removed again using the 
DROP VIEW statement:


DROP VIEW MyNewView;

Attention The definition of views in the CLI is limited to the mentioned 
syntax above. Defining a schema for views or escaping whitespaces in 
table names will be supported in future versions.


So create view is supported but has the limits.

regards.


Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

2019-09-04 Thread JingsongLee
你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh 
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
        at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
        at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at 
java.security.AccessController.doPrivileged(Native Method)
        at 
javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
        at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
        at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
        at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

?????? Re: flink1.9??blinkSQL??????udf??TIMESTAMP????????

2019-09-04 Thread ????
DataType??udf??


import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;




public class UTC2Local extends ScalarFunction {
    public Timestamp eval(Timestamp s) {
        long timestamp = s.getTime() + 2880;
        return new Timestamp(timestamp);
    }


}








--  --
??: "JingsongLee"

Re: 如何优化flink内存?

2019-09-04 Thread Yifei Qi
意思是要自己写方法实现滑动窗口的功能?  自己控制中间数据如何保存?

Victor Wong  于2019年9月4日周三 下午10:36写道:

> 这种情况不建议使用滑动窗口,因为会保存大量的窗口数据(24小时/1分钟);
> 可以自定义ProcessFunction,参照[1];
>
> [1].
> https://stackoverflow.com/questions/51977741/flink-performance-issue-with-sliding-time-window
>
>
>
>
>
> On 04/09/2019, 8:07 PM, "Yifei Qi"  wrote:
>
> >大家好:
> >
> >
> >
> >不知道大家在使用flink时遇到过内存消耗过大的问题么?
> >
> >
> >
> >我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
> >
> >
> >
> >具体情况是这样的:
> >
> >准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
> >
> >按照用户进行分组.
> >
> >计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
> >
> >
> >
> >
> >
> >flink运行在3个节点后, 内存合计就用了5G.
> >
> >
> >
> >
> >
> >flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
> >
> >
> >
> >
> >
> >顺祝商祺
> >
> >
> >--
> >
> >
> >Qi Yifei
> >[image: https://]about.me/qyf404
> ><
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> >
>


-- 


Qi Yifei
[image: https://]about.me/qyf404



Re: 如何优化flink内存?

2019-09-04 Thread Yifei Qi
你们遇到这种问题怎么处理的了?

Shuo Cheng  于2019年9月4日周三 下午10:47写道:

> 如果是使用 datastream api 的话,滑动窗口对于每条数据都会在 state 中存 size / slide
> 份,像你这种大小设置,肯定会导致内存的大量消耗.
>
> On Wed, Sep 4, 2019 at 8:07 PM Yifei Qi  wrote:
>
> > 大家好:
> >
> >
> >
> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
> >
> >
> >
> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
> >
> >
> >
> > 具体情况是这样的:
> >
> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
> >
> > 按照用户进行分组.
> >
> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
> >
> >
> >
> >
> >
> > flink运行在3个节点后, 内存合计就用了5G.
> >
> >
> >
> >
> >
> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
> >
> >
> >
> >
> >
> > 顺祝商祺
> >
> >
> > --
> >
> >
> > Qi Yifei
> > [image: https://]about.me/qyf404
> > <
> >
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> > >
> >
>


-- 


Qi Yifei
[image: https://]about.me/qyf404



Re: 如何优化flink内存?

2019-09-04 Thread Yifei Qi
你的意思是自己去实现滑动窗口的功能么?

戴嘉诚  于2019年9月4日周三 下午10:51写道:

> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>
> Yifei Qi 于2019年9月4日 周三20:07写道:
>
> > 大家好:
> >
> >
> >
> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
> >
> >
> >
> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
> >
> >
> >
> > 具体情况是这样的:
> >
> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
> >
> > 按照用户进行分组.
> >
> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
> >
> >
> >
> >
> >
> > flink运行在3个节点后, 内存合计就用了5G.
> >
> >
> >
> >
> >
> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
> >
> >
> >
> >
> >
> > 顺祝商祺
> >
> >
> > --
> >
> >
> > Qi Yifei
> > [image: https://]about.me/qyf404
> > <
> >
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> > >
> >
>


-- 


Qi Yifei
[image: https://]about.me/qyf404



答复: 如何优化flink内存?

2019-09-04 Thread 戴嘉诚
对,你可以自己再state中维持一整天的数据,让后根据时间戳来删除过期数据来替换滑动窗口


发件人: Yifei Qi
发送时间: 2019年9月5日 13:42
收件人: user-zh@flink.apache.org
主题: Re: 如何优化flink内存?

你的意思是自己去实现滑动窗口的功能么?

戴嘉诚  于2019年9月4日周三 下午10:51写道:

> 这里我建议你是用key process然后在里面用state来管理和聚集数据,这样会节省很大一部分内存
>
> Yifei Qi 于2019年9月4日 周三20:07写道:
>
> > 大家好:
> >
> >
> >
> > 不知道大家在使用flink时遇到过内存消耗过大的问题么?
> >
> >
> >
> > 我们最近在用flink算一些实时的统计数据, 但是内存消耗很大, 不知道有没有人知道如何优化?
> >
> >
> >
> > 具体情况是这样的:
> >
> > 准备的测试数据模拟一天时间内3万个用户的5万条数据. 原始数据一共是100M.
> >
> > 按照用户进行分组.
> >
> > 计算两个滑动窗口任务:一个是近1小时, 每5秒滑动一次的窗口. 一个是近24小时, 每1分钟滑动一次的窗口.
> >
> >
> >
> >
> >
> > flink运行在3个节点后, 内存合计就用了5G.
> >
> >
> >
> >
> >
> > flink如此消耗内存, 不知道是它本来就这么吃内存, 还是我使用的有问题.
> >
> >
> >
> >
> >
> > 顺祝商祺
> >
> >
> > --
> >
> >
> > Qi Yifei
> > [image: https://]about.me/qyf404
> > <
> >
> https://about.me/qyf404?promo=email_sig&utm_source=product&utm_medium=email_sig&utm_campaign=gmail_api
> > >
> >
>


-- 


Qi Yifei
[image: https://]about.me/qyf404




Re: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

2019-09-04 Thread JingsongLee
override getResultType方法,返回Types.SQL_TIMESTAMP.
这样应该可以绕过。
1.10会修复这个问题。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 12:11
To:user-zh@flink.apache.org JingsongLee ; user-zh 

Subject:回复: Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

在哪声明DataType,这个要引入什么包吗,求指点,我的udf代码如下:

import org.apache.flink.table.functions.ScalarFunction;
import java.sql.Timestamp;


public class UTC2Local extends ScalarFunction {
public Timestamp eval(Timestamp s) {
long timestamp = s.getTime() + 2880;
return new Timestamp(timestamp);
}

}



-- 原始邮件 --
发件人: "JingsongLee";
发送时间: 2019年9月5日(星期四) 中午11:55
收件人: "user-zh";
主题:  Re: flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

你声明了DataType吗?代码怎么写的?
由于目前只支持精度<=3,所以你得用DataTypes.TIMESTAMP(3)来表示。

Best,
Jingsong Lee


--
From:守护 <346531...@qq.com>
Send Time:2019年9月5日(星期四) 11:48
To:user-zh 
Subject:flink1.9中blinkSQL对定义udf的TIMESTAMP类型报错

社区的各位大佬好:


使用场景:flink1.9版本使用flinkSQL创建udf函数使用没有问,当切换到blinkSQL使用这个udf就会报错TIMESTAMP类型错误,udf实现的功能也很简单,就是将时间+8小时,报错信息如下


org.apache.flink.client.program.ProgramInvocationException: The main method 
caused an error: Not support dataType: TIMESTAMP(9)
        at 
org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:593)
        at 
org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:438)
        at 
org.apache.flink.client.program.OptimizerPlanEnvironment.getOptimizedPlan(OptimizerPlanEnvironment.java:83)
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:80)
        at 
org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:122)
        at 
org.apache.flink.client.cli.CliFrontend.runProgram(CliFrontend.java:227)
        at 
org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:205)
        at 
org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:1010)
        at 
org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1083)
        at 
java.security.AccessController.doPrivileged(Native Method)
        at 
javax.security.auth.Subject.doAs(Subject.java:422)
        at 
org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1754)
        at 
org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
        at 
org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1083)
Caused by: java.lang.RuntimeException: Not support dataType: TIMESTAMP(9)
        at 
org.apache.flink.table.dataformat.DataFormatConverters.getConverterForDataType(DataFormatConverters.java:248)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.isConverterIdentity(CodeGenUtils.scala:661)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:669)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternal(CodeGenUtils.scala:665)
        at 
org.apache.flink.table.planner.codegen.CodeGenUtils$.genToInternalIfNeeded(CodeGenUtils.scala:687)
        at 
org.apache.flink.table.planner.codegen.calls.ScalarFunctionCallGen.generate(ScalarFunctionCallGen.scala:79)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateCallExpression(ExprCodeGenerator.scala:737)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:451)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.visitCall(ExprCodeGenerator.scala:51)
        at 
org.apache.calcite.rex.RexCall.accept(RexCall.java:191)
        at 
org.apache.flink.table.planner.codegen.ExprCodeGenerator.generateExpression(ExprCodeGenerator.scala:131)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at 
org.apache.flink.table.planner.codegen.CalcCodeGenerator$$anonfun$5.apply(CalcCodeGenerator.scala:150)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at 
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at 
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
        at 
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at 
scala.collection.AbstractTraversable.map(Traversable.scala:104)

Flink 1.9 Blink planner 时间字段问题

2019-09-04 Thread hb
代码里定义了kafka connectorDescriptor , 从kafka读取json格式数据, 生成Table
schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000))
kafka输入:  {"eventTime": 10, "id":1,"name":"hb"}  会报错,

输入  {"eventTime": "2019-09-02T09:56:16.484Z", "id":1,"name":"hb"} 结果显示正确,
eventTime 字段怎么不支持数值输入呢.


错误提示:
```
Caused by: java.lang.Exception: java.io.IOException: Failed to deserialize JSON 
object.
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:212)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask.performDefaultAction(SourceStreamTask.java:132)
at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:298)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:403)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: Failed to deserialize JSON object.
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:129)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:72)
at 
org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45)
at 
org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:146)
at 
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:715)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:100)
at 
org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:63)
at 
org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.run(SourceStreamTask.java:202)
Caused by: java.time.format.DateTimeParseException: Text '10' could not be 
parsed at index 0
at 
java.time.format.DateTimeFormatter.parseResolved0(DateTimeFormatter.java:1949)
at java.time.format.DateTimeFormatter.parse(DateTimeFormatter.java:1777)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$createTimestampConverter$1dee6515$1(JsonRowDeserializationSchema.java:334)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.convertField(JsonRowDeserializationSchema.java:403)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$assembleRowConverter$77f7700$1(JsonRowDeserializationSchema.java:382)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.lambda$wrapIntoNullableConverter$d586c97$1(JsonRowDeserializationSchema.java:232)
at 
org.apache.flink.formats.json.JsonRowDeserializationSchema.deserialize(JsonRowDeserializationSchema.java:127)
... 7 more
```




源码:
```
  val env = StreamExecutionEnvironment.getExecutionEnvironment
  val conf = 
EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
  val tEnv = StreamTableEnvironment.create(env, conf)
  env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)


  val kafkaIn = new Kafka()
.version("0.11")
.topic("hbtest111")
.property("bootstrap.servers", "192.168.1.160:19092")
.property("group.id", "test2")


  val json = new Json().deriveSchema()


  val schema = new Schema()
.field("id", Types.INT())
.field("name", Types.STRING())


  schema.field("_proctime", Types.SQL_TIMESTAMP()).proctime()
  schema
.field("_rowtime", Types.SQL_TIMESTAMP())
.rowtime(
  new Rowtime()
.timestampsFromField("eventTime")
.watermarksPeriodicBounded(1000)
)


  
tEnv.connect(kafkaIn).withFormat(json).withSchema(schema).inAppendMode().registerTableSource("table_from_kafka")
  val t = tEnv.sqlQuery("select * from table_from_kafka")
  t.printSchema()


  t.toRetractStream[Row].print()
  tEnv.execute("")
```

Re: Re: flink1.9.0对DDL的支持

2019-09-04 Thread pengcheng...@bonc.com.cn
谢谢你的回答,Wesley Peng.只能在CLI里Create view 还是太不灵活了,期待1.10.


 
发件人: Wesley Peng
发送时间: 2019-09-05 11:52
收件人: user-zh
主题: Re: flink1.9.0对DDL的支持
Hi
 
on 2019/9/5 11:23, pengcheng...@bonc.com.cn wrote:
> 请教一下, 1.flink1.9.0的table API/sql是不是还没有支持Create view?
 
from the official documentation of flink 1.9:
 
Views can also be created within a CLI session using the CREATE VIEW 
statement:
 
CREATE VIEW MyNewView AS SELECT MyField2 FROM MyTableSource;
 
Views created within a CLI session can also be removed again using the 
DROP VIEW statement:
 
DROP VIEW MyNewView;
 
Attention The definition of views in the CLI is limited to the mentioned 
syntax above. Defining a schema for views or escaping whitespaces in 
table names will be supported in future versions.
 
So create view is supported but has the limits.
 
regards.