Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes

2021-02-03 文章 yang nick
flink window  doesn't support update stream.

HongHuangNeu  于2021年2月4日周四 上午9:24写道:

> 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming
> update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是
>
> SELECT [column_list]
> FROM (
>SELECT [column_list],
>  ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]]
>ORDER BY time_attr [asc|desc]) AS rownum
>FROM table_name)
> WHERE rownum = 1
>
> 这样的语句
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: reduce函数的trigger问题

2021-01-28 文章 yang nick
窗口没有结束,所有的数据都还在的

xiaolail...@163.com  于2021年1月29日周五 上午11:27写道:

> 您好!最近刚开始学习flink,问一个关于trigger的问题:
>
> 如下的reduce操作:
> env.socketTextStream("localhost", )
> .flatMap(new Splitter())
> .keyBy(value -> value.f0)
> .window(TumblingEventTimeWindows.of(Time.seconds(15)))
> .reduce(new ReduceFunction>() {
> @Override
> public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception {
> return new Tuple2<>(value1.f0, value1.f1 +
> value2.f1);
> }
> });
>
> 使用的trigger是:
> @Override
> public Trigger
> getDefaultTrigger(StreamExecutionEnvironment env) {
> return EventTimeTrigger.create();
> }
>
>
> 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录?
> 多谢指导!
>
>
>
>
>
> xiaolail...@163.com
>


Re: Re: 多流join的场景如何优化

2021-01-25 文章 yang nick
两两join吧

hl9...@126.com  于2021年1月26日周二 下午2:28写道:

> 我们还没用到flink sql,有用流API实现的思路吗?
>
>
>
> hl9...@126.com
>
> 发件人: yang nick
> 发送时间: 2021-01-26 11:32
> 收件人: user-zh
> 主题: Re: 多流join的场景如何优化
> flink sql + zeppelin
>
> hl9...@126.com  于2021年1月26日周二 上午11:30写道:
>
> > 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
> >
> > 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
> > market_act(营销活动):
> > {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
> > new_member(新增会员):
> {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
> >
> >
> orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
> >
> > 需求:按活动统计活动期间新会员产生的订单金额
> > 伪sql:
> > select act_id,count(1) as order_num,sum(amt) as order_amt
> > from orders t1
> > inner join new_member t2 on t1.member_id=t2.member_id
> > inner join market_act t3 on t2.act_id=t3.act_id
> > where t1.create_time between t3.start_time and t3.end_time ;
> >
> > 目前做法:
> > 将 market_act 和 new_member 两个维表消息放到redis缓存,
> > flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
> > 是则输出{act_id,order_no,amt,member_id},然后sink到db。
> >
> > 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
> >
> >
> >
> > hl9...@126.com
> >
>


Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-25 文章 yang nick
建议用zeppelin

jinsx  于2021年1月26日周二 上午11:48写道:

>
> 想在生产环境部署flink-sql-gateway,通过jdbc方式提交sql任务。不知道flink-sql-gateway稳定性如何,有大佬能给点建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


Re: 多流join的场景如何优化

2021-01-25 文章 yang nick
flink sql + zeppelin

hl9...@126.com  于2021年1月26日周二 上午11:30写道:

> 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
>
> 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
> market_act(营销活动):
> {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
> new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
>
> orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
>
> 需求:按活动统计活动期间新会员产生的订单金额
> 伪sql:
> select act_id,count(1) as order_num,sum(amt) as order_amt
> from orders t1
> inner join new_member t2 on t1.member_id=t2.member_id
> inner join market_act t3 on t2.act_id=t3.act_id
> where t1.create_time between t3.start_time and t3.end_time ;
>
> 目前做法:
> 将 market_act 和 new_member 两个维表消息放到redis缓存,
> flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
> 是则输出{act_id,order_no,amt,member_id},然后sink到db。
>
> 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
>
>
>
> hl9...@126.com
>


Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-24 文章 yang nick
应该是guava包冲突问题,请参考这篇文章(参考)
https://blog.csdn.net/u012121587/article/details/103903162


董海峰(Sharp)  于2021年1月24日周日 上午9:11写道:

> Hi,您好啊,我最近遇到一个问题,在社区里发过,但是没人回答,想请教您一下,烦请有空的时候回复一下,谢谢您啦。
> hadoop3.3.0 flink1.12 hive3.12
> I want to integrate hive and flink. After I configure the
> sql-client-dqfaults.yaml file,
> catalogs:
>- name: default_catalog
>  type: hive
>  hive-conf-dir: /cdc/apache-hive-3.1.2-bin/conf
>
> I start the flink sql client, but the following error is reported.
> [root@dhf4 bin]# ./sql-client.sh embedded
> SLF4J: Class path contains multiple SLF4J bindings.
> SLF4J: Found binding in
> [jar:file:/cdc/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: Found binding in
> [jar:file:/cdc/hadoop-3.3.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
> SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an
> explanation.
> SLF4J: Actual binding is of type
> [org.apache.logging.slf4j.Log4jLoggerFactory]
> No default environment specified.
> Searching for '/cdc/flink-1.12.0/conf/sql-client-defaults.yaml'...found.
> Reading default environment from:
> file:/cdc/flink-1.12.0/conf/sql-client-defaults.yaml
> No session environment specified.
> 2021-01-20 10:12:38,179 INFO  org.apache.hadoop.hive.conf.HiveConf
>  [] - Found configuration file
> file:/cdc/apache-hive-3.1.2-bin/conf/hive-site.xml
> Exception in thread "main"
> org.apache.flink.table.client.SqlClientException: Unexpected exception.
> This is a bug. Please consider filing an issue.
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208)
> Caused by: org.apache.flink.table.client.gateway.SqlExecutionException:
> Could not create execution context.
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878)
> at
> org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226)
> at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108)
> at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196)
> Caused by: java.lang.NoSuchMethodError:
> com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1380)
> at org.apache.hadoop.conf.Configuration.set(Configuration.java:1361)
> at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:536)
> at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:554)
> at org.apache.hadoop.mapred.JobConf.(JobConf.java:448)
> at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:5141)
> at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:5109)
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:211)
> at
> org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:164)
> at
> org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:89)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:384)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:634)
> at java.util.HashMap.forEach(HashMap.java:1289)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:633)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:266)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:632)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138)
> at
> org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867)
> ... 3 more
>
> The log content is as follows
> [root@dhf4 bin]# cat ../log/flink-root-sql-client-dhf4.log
> 2021-01-20 10:12:36,246 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.address, localhost
> 2021-01-20 10:12:36,252 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.rpc.port, 6123
> 2021-01-20 10:12:36,252 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: jobmanager.memory.process.size, 1600m
> 2021-01-20 10:12:36,252 INFO
> org.apache.flink.configuration.GlobalConfiguration   [] - Loading
> configuration property: taskmanager.memory.process.size, 1728m
> 2021-01-20 10:12:36,252 INFO
> org.apache.flink.configuration.Gl

Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl

徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道:

>
> 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。
>
>
>
>
> -- 原始邮件 --
> 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:16
> 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
>
>
>
> 我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了
>
> 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道:
>
> >
> >
> 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。
> >
> >
> >
> >
> > --&nbsp;原始邮件&nbsp;--
> > 发件人: "yang nick" > 发送时间: 2021年1月23日(星期六) 中午11:04
> > 收件人: "user-zh" > 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
> >
> >
> >
> > 把拆开的表在join起来嘛
> >
> > 徐州州 <25977...@qq.com&gt; 于2021年1月23日周六 上午10:48写道:
> >
> > &gt;
> >
> 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗
> > &gt;
> >
> ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。


Re: flink-sql-gateway支持远程吗

2021-01-22 文章 yang nick
可以试试zeppelin

罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道:

>
> 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行
>
>
> | |
> 15927482803
> |
> |
> 邮箱:15927482...@163.com
> |
>
> 签名由 网易邮箱大师 定制


Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了

徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道:

>
> 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。
>
>
>
>
> -- 原始邮件 --
> 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:04
> 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
>
>
>
> 把拆开的表在join起来嘛
>
> 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午10:48写道:
>
> >
> 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗
> >
> ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。


Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。

2021-01-22 文章 yang nick
把拆开的表在join起来嘛

徐州州 <25977...@qq.com> 于2021年1月23日周六 上午10:48写道:

> 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗
> ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。


Re: 请教关于Flink yarnship的使用

2021-01-22 文章 yang nick
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看

Yan Tang  于2021年1月22日周五 下午4:53写道:

> 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn
> cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么?
> 我的提交命令:
> -yt /path/to/conf
>
> code:
> this.getClass().getResourceAsStream("conf/cmp_online.cfg")
> 但一直返回null.
>
>
>
>
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: Flink 并行度问题

2021-01-22 文章 yang nick
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot

Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道:

> 使用Flink以来,一直有一个问题困扰着。
>
>
> Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。
>
> 比如Flink消费kafka
>
> topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。
>
>
> 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ?
>
> 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢?
>
>
>
>
> -
> Thanks!
> Jacob
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/