Re: 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming update and delete changes
flink window doesn't support update stream. HongHuangNeu 于2021年2月4日周四 上午9:24写道: > 如果输入有回撤流的话,group by时间窗口会遇到GroupWindowAggregate doesn't support consuming > update and delete changes,有没有什么替代方案?输入是来自于流式去重,就是 > > SELECT [column_list] > FROM ( >SELECT [column_list], > ROW_NUMBER() OVER ([PARTITION BY col1[, col2...]] >ORDER BY time_attr [asc|desc]) AS rownum >FROM table_name) > WHERE rownum = 1 > > 这样的语句 > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: reduce函数的trigger问题
窗口没有结束,所有的数据都还在的 xiaolail...@163.com 于2021年1月29日周五 上午11:27写道: > 您好!最近刚开始学习flink,问一个关于trigger的问题: > > 如下的reduce操作: > env.socketTextStream("localhost", ) > .flatMap(new Splitter()) > .keyBy(value -> value.f0) > .window(TumblingEventTimeWindows.of(Time.seconds(15))) > .reduce(new ReduceFunction>() { > @Override > public Tuple2 reduce(Tuple2 Integer> value1, Tuple2 value2) throws Exception { > return new Tuple2<>(value1.f0, value1.f1 + > value2.f1); > } > }); > > 使用的trigger是: > @Override > public Trigger > getDefaultTrigger(StreamExecutionEnvironment env) { > return EventTimeTrigger.create(); > } > > > 然后的EventTimeTrigger实现是当watermark漫过当前window之后才触发的,我的疑问是reduce函数不是增量做计算的吗?如果等到watermark漫过当前window之后才触发计算,那岂不是要缓着所有的记录? > 多谢指导! > > > > > > xiaolail...@163.com >
Re: Re: 多流join的场景如何优化
两两join吧 hl9...@126.com 于2021年1月26日周二 下午2:28写道: > 我们还没用到flink sql,有用流API实现的思路吗? > > > > hl9...@126.com > > 发件人: yang nick > 发送时间: 2021-01-26 11:32 > 收件人: user-zh > 主题: Re: 多流join的场景如何优化 > flink sql + zeppelin > > hl9...@126.com 于2021年1月26日周二 上午11:30写道: > > > 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。 > > > > 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段): > > market_act(营销活动): > > {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店} > > new_member(新增会员): > {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间} > > > > > orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店} > > > > 需求:按活动统计活动期间新会员产生的订单金额 > > 伪sql: > > select act_id,count(1) as order_num,sum(amt) as order_amt > > from orders t1 > > inner join new_member t2 on t1.member_id=t2.member_id > > inner join market_act t3 on t2.act_id=t3.act_id > > where t1.create_time between t3.start_time and t3.end_time ; > > > > 目前做法: > > 将 market_act 和 new_member 两个维表消息放到redis缓存, > > flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动, > > 是则输出{act_id,order_no,amt,member_id},然后sink到db。 > > > > 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算? > > > > > > > > hl9...@126.com > > >
Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?
建议用zeppelin jinsx 于2021年1月26日周二 上午11:48写道: > > 想在生产环境部署flink-sql-gateway,通过jdbc方式提交sql任务。不知道flink-sql-gateway稳定性如何,有大佬能给点建议吗? > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/
Re: 多流join的场景如何优化
flink sql + zeppelin hl9...@126.com 于2021年1月26日周二 上午11:30写道: > 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。 > > 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段): > market_act(营销活动): > {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店} > new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间} > > orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店} > > 需求:按活动统计活动期间新会员产生的订单金额 > 伪sql: > select act_id,count(1) as order_num,sum(amt) as order_amt > from orders t1 > inner join new_member t2 on t1.member_id=t2.member_id > inner join market_act t3 on t2.act_id=t3.act_id > where t1.create_time between t3.start_time and t3.end_time ; > > 目前做法: > 将 market_act 和 new_member 两个维表消息放到redis缓存, > flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动, > 是则输出{act_id,order_no,amt,member_id},然后sink到db。 > > 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算? > > > > hl9...@126.com >
Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V
应该是guava包冲突问题,请参考这篇文章(参考) https://blog.csdn.net/u012121587/article/details/103903162 董海峰(Sharp) 于2021年1月24日周日 上午9:11写道: > Hi,您好啊,我最近遇到一个问题,在社区里发过,但是没人回答,想请教您一下,烦请有空的时候回复一下,谢谢您啦。 > hadoop3.3.0 flink1.12 hive3.12 > I want to integrate hive and flink. After I configure the > sql-client-dqfaults.yaml file, > catalogs: >- name: default_catalog > type: hive > hive-conf-dir: /cdc/apache-hive-3.1.2-bin/conf > > I start the flink sql client, but the following error is reported. > [root@dhf4 bin]# ./sql-client.sh embedded > SLF4J: Class path contains multiple SLF4J bindings. > SLF4J: Found binding in > [jar:file:/cdc/flink-1.12.0/lib/log4j-slf4j-impl-2.12.1.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: Found binding in > [jar:file:/cdc/hadoop-3.3.0/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class] > SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an > explanation. > SLF4J: Actual binding is of type > [org.apache.logging.slf4j.Log4jLoggerFactory] > No default environment specified. > Searching for '/cdc/flink-1.12.0/conf/sql-client-defaults.yaml'...found. > Reading default environment from: > file:/cdc/flink-1.12.0/conf/sql-client-defaults.yaml > No session environment specified. > 2021-01-20 10:12:38,179 INFO org.apache.hadoop.hive.conf.HiveConf > [] - Found configuration file > file:/cdc/apache-hive-3.1.2-bin/conf/hive-site.xml > Exception in thread "main" > org.apache.flink.table.client.SqlClientException: Unexpected exception. > This is a bug. Please consider filing an issue. > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:208) > Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: > Could not create execution context. > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:878) > at > org.apache.flink.table.client.gateway.local.LocalExecutor.openSession(LocalExecutor.java:226) > at org.apache.flink.table.client.SqlClient.start(SqlClient.java:108) > at org.apache.flink.table.client.SqlClient.main(SqlClient.java:196) > Caused by: java.lang.NoSuchMethodError: > com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V > at org.apache.hadoop.conf.Configuration.set(Configuration.java:1380) > at org.apache.hadoop.conf.Configuration.set(Configuration.java:1361) > at org.apache.hadoop.mapred.JobConf.setJar(JobConf.java:536) > at org.apache.hadoop.mapred.JobConf.setJarByClass(JobConf.java:554) > at org.apache.hadoop.mapred.JobConf.(JobConf.java:448) > at org.apache.hadoop.hive.conf.HiveConf.initialize(HiveConf.java:5141) > at org.apache.hadoop.hive.conf.HiveConf.(HiveConf.java:5109) > at > org.apache.flink.table.catalog.hive.HiveCatalog.createHiveConf(HiveCatalog.java:211) > at > org.apache.flink.table.catalog.hive.HiveCatalog.(HiveCatalog.java:164) > at > org.apache.flink.table.catalog.hive.factories.HiveCatalogFactory.createCatalog(HiveCatalogFactory.java:89) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.createCatalog(ExecutionContext.java:384) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$null$5(ExecutionContext.java:634) > at java.util.HashMap.forEach(HashMap.java:1289) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.lambda$initializeCatalogs$6(ExecutionContext.java:633) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.wrapClassLoader(ExecutionContext.java:266) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeCatalogs(ExecutionContext.java:632) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.initializeTableEnvironment(ExecutionContext.java:529) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:185) > at > org.apache.flink.table.client.gateway.local.ExecutionContext.(ExecutionContext.java:138) > at > org.apache.flink.table.client.gateway.local.ExecutionContext$Builder.build(ExecutionContext.java:867) > ... 3 more > > The log content is as follows > [root@dhf4 bin]# cat ../log/flink-root-sql-client-dhf4.log > 2021-01-20 10:12:36,246 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.address, localhost > 2021-01-20 10:12:36,252 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.rpc.port, 6123 > 2021-01-20 10:12:36,252 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: jobmanager.memory.process.size, 1600m > 2021-01-20 10:12:36,252 INFO > org.apache.flink.configuration.GlobalConfiguration [] - Loading > configuration property: taskmanager.memory.process.size, 1728m > 2021-01-20 10:12:36,252 INFO > org.apache.flink.configuration.Gl
Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
我感觉你这个用实时很难做,涉及到状态更新的无限流,需要配置 state ttl 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:23写道: > > 我遇到的难题是,拒收订单想拿到payAment字段必须扫描全量的order_money表。order_money是下单时候才会产生,我拒收订单根本不知道它的下单时间根本不知道怎么拿,而且order_money没有任何标记,我全量扫描money表程序OOM。我的数据是通过Canal监控过来的,我需要写flink-sql来进行join。 > > > > > -- 原始邮件 -- > 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:16 > 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。 > > > > 我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了 > > 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道: > > > > > > 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。 > > > > > > > > > > -- 原始邮件 -- > > 发件人: "yang nick" > 发送时间: 2021年1月23日(星期六) 中午11:04 > > 收件人: "user-zh" > 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。 > > > > > > > > 把拆开的表在join起来嘛 > > > > 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午10:48写道: > > > > > > > > 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗 > > > > > > ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。
Re: flink-sql-gateway支持远程吗
可以试试zeppelin 罗显宴 <15927482...@163.com> 于2021年1月23日周六 上午11:19写道: > > 大佬们,我试过flinksql-client和flink-sql-gateway发现都是把代码提交到本地集群运行,好像不能提交到指定远程集群上执行任务,请问大佬们,怎么提交代码到远程集群运行 > > > | | > 15927482803 > | > | > 邮箱:15927482...@163.com > | > > 签名由 网易邮箱大师 定制
Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
我觉的既然是做数仓,这里没啥好纠结的,数据已经在数仓里面了,做成宽表就行了 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午11:11写道: > > 我起初也有这个想法,但是他们业务告诉我,order_reject表是单独存在的。拒收单子只会在order_reject出现,而order_money只在它下单的时候写入,拒收订单数据过来order_money不会有任何变化,所以我即使拿到每天的拒收订单去join也是要读取全量order_money。 > > > > > -- 原始邮件 -- > 发件人: "yang nick" 发送时间: 2021年1月23日(星期六) 中午11:04 > 收件人: "user-zh" 主题: Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。 > > > > 把拆开的表在join起来嘛 > > 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午10:48写道: > > > > 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗 > > > ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。
Re: 公司强行拆分Orders主表,遇到事实表join请各位大神们支个招。
把拆开的表在join起来嘛 徐州州 <25977...@qq.com> 于2021年1月23日周六 上午10:48写道: > 请教各位大佬一个业务问题,(当天日报业务)版本1:我可以通过拒收时间直接拿到订单主表当天的拒收订单和payAmont金额,版本2:现在公司订单主表重做,拒收订单和payAmont被拆分出两张表,Order_reject和Order_money,现在我的问题是就算我可以从(拒收订单表)中通过时间拿到当天拒收订单,但是PayAmont无法通过时间在Order_money表拿到,PayAmont是在下单的时候才会生成,这样我当天拒收业务都要扫描全量的order_money表拿到对应的PayAmont金额字段,有什么好的处理方法吗 > ?我目前的处理方法是强行读取Order_money的createTime-30day来,尽可能的让拒收订单找到对应的payAmont字段,不只是只拆分出一张order_money还有order_extend还有好几张表,我是做数仓的,我就没见过这么干的。强行把度量值字段拆分出来。请各位大神支个招。
Re: 请教关于Flink yarnship的使用
这个方法应该是读取本地的文件,但是你放到yarn中执行,就会找不到这个文件。所以建议可以把配置上传到hdfs中试试看 Yan Tang 于2021年1月22日周五 下午4:53写道: > 我把配置和jar包分开了,用-yt option将配置文件Ship到yarn > cluster中,但是在获取配置文件的时候,老是拿不到,有人有相关经验么? > 我的提交命令: > -yt /path/to/conf > > code: > this.getClass().getResourceAsStream("conf/cmp_online.cfg") > 但一直返回null. > > > > > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >
Re: Flink 并行度问题
如果是 standalone的模式部署在一台机器上,那么据我了解,只会有一个TM,一个TM可以有多个slot Jacob <17691150...@163.com> 于2021年1月22日周五 下午4:18写道: > 使用Flink以来,一直有一个问题困扰着。 > > > Flink 设置n个并行度后,是指占用n个CPU,而不是n个CPU核数。 > > 比如Flink消费kafka > > topic时,并行度数量往往都建议设置topic分区的个数,意在让每个并行度消费一个分区,达到性能最优。那也就是说一个并行度代表一个消费线程,同时也表示一个slot,又由于在Flink中一个并行度表示一个CPU,那么是不是可以理解为一个CPU就是一个线程。 > > > 如果FLink 以standalone的模式部署在一台机器上,这台机器有4个CPU,每个CPU有6个核,那么该集群的最大并行度是不是就是 4 ? > > 在我的认识中,一个核表示一个线程,但在上面的理解中,好像是一个CPU就是一个线程,这是否矛盾呢? > > > > > - > Thanks! > Jacob > -- > Sent from: http://apache-flink.147419.n8.nabble.com/