Re:多流join的场景如何优化

2021-01-25 文章 Ye Chen
双流join或者多流join从技术上是可以实现你这个场景的,网上有很多成熟的案例。
但是要考虑具体的业务需求,比如数据是否能在规定时间到达,未到达如何处理,如果因为多流join造成数据缺失或者延迟,对业务影响比较大的话还不如继续用维表。

















在 2021-01-26 11:30:56,"hl9...@126.com"  写道:
>请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
>
>电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
>market_act(营销活动): 
>{act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
>new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
>orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
>
>需求:按活动统计活动期间新会员产生的订单金额
>伪sql: 
>select act_id,count(1) as order_num,sum(amt) as order_amt 
>from orders t1 
>inner join new_member t2 on t1.member_id=t2.member_id
>inner join market_act t3 on t2.act_id=t3.act_id 
>where t1.create_time between t3.start_time and t3.end_time ;
>
>目前做法:
>将 market_act 和 new_member 两个维表消息放到redis缓存,
>flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
>是则输出{act_id,order_no,amt,member_id},然后sink到db。
>
>我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
>
>
>
>hl9...@126.com


Re: 咨询关于flink究竟使用的是log4j1还是log4j2.

2021-01-25 文章 赵一旦
好的。

zilong xiao  于2021年1月26日周二 下午2:13写道:

> Hi
>
>
> flink从1.11开始应该支持log4j,logback,log4j2了,1.11之前的版本只支持前两者,log4j2也是可以用.properties配置的,现在1.12里的默认配置就是log4j2
>
> 祝好~
>
> 赵一旦  于2021年1月26日周二 下午1:27写道:
>
> >
> >
> 网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。
> >
>


Re: Re: 多流join的场景如何优化

2021-01-25 文章 yang nick
两两join吧

hl9...@126.com  于2021年1月26日周二 下午2:28写道:

> 我们还没用到flink sql,有用流API实现的思路吗?
>
>
>
> hl9...@126.com
>
> 发件人: yang nick
> 发送时间: 2021-01-26 11:32
> 收件人: user-zh
> 主题: Re: 多流join的场景如何优化
> flink sql + zeppelin
>
> hl9...@126.com  于2021年1月26日周二 上午11:30写道:
>
> > 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
> >
> > 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
> > market_act(营销活动):
> > {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
> > new_member(新增会员):
> {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
> >
> >
> orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
> >
> > 需求:按活动统计活动期间新会员产生的订单金额
> > 伪sql:
> > select act_id,count(1) as order_num,sum(amt) as order_amt
> > from orders t1
> > inner join new_member t2 on t1.member_id=t2.member_id
> > inner join market_act t3 on t2.act_id=t3.act_id
> > where t1.create_time between t3.start_time and t3.end_time ;
> >
> > 目前做法:
> > 将 market_act 和 new_member 两个维表消息放到redis缓存,
> > flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
> > 是则输出{act_id,order_no,amt,member_id},然后sink到db。
> >
> > 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
> >
> >
> >
> > hl9...@126.com
> >
>


Re: Re: 多流join的场景如何优化

2021-01-25 文章 hl9...@126.com
我们还没用到flink sql,有用流API实现的思路吗?



hl9...@126.com
 
发件人: yang nick
发送时间: 2021-01-26 11:32
收件人: user-zh
主题: Re: 多流join的场景如何优化
flink sql + zeppelin
 
hl9...@126.com  于2021年1月26日周二 上午11:30写道:
 
> 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
>
> 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
> market_act(营销活动):
> {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
> new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
>
> orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
>
> 需求:按活动统计活动期间新会员产生的订单金额
> 伪sql:
> select act_id,count(1) as order_num,sum(amt) as order_amt
> from orders t1
> inner join new_member t2 on t1.member_id=t2.member_id
> inner join market_act t3 on t2.act_id=t3.act_id
> where t1.create_time between t3.start_time and t3.end_time ;
>
> 目前做法:
> 将 market_act 和 new_member 两个维表消息放到redis缓存,
> flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
> 是则输出{act_id,order_no,amt,member_id},然后sink到db。
>
> 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
>
>
>
> hl9...@126.com
>


Re: 咨询关于flink究竟使用的是log4j1还是log4j2.

2021-01-25 文章 zilong xiao
Hi

flink从1.11开始应该支持log4j,logback,log4j2了,1.11之前的版本只支持前两者,log4j2也是可以用.properties配置的,现在1.12里的默认配置就是log4j2

祝好~

赵一旦  于2021年1月26日周二 下午1:27写道:

>
> 网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。
>


Re: flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-25 文章 yang nick
建议用zeppelin

jinsx  于2021年1月26日周二 上午11:48写道:

>
> 想在生产环境部署flink-sql-gateway,通过jdbc方式提交sql任务。不知道flink-sql-gateway稳定性如何,有大佬能给点建议吗?
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/


咨询关于flink究竟使用的是log4j1还是log4j2.

2021-01-25 文章 赵一旦
网上很多人说log4j2是使用.xml配置。但是flink的conf中只有properties,但是官方文档讲默认使用log4j2?搞蒙了,究竟用的哪个呢。


flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-25 文章 jinsx
想在生产环境部署flink-sql-gateway,通过jdbc方式提交sql任务。不知道flink-sql-gateway稳定性如何,有大佬能给点建议吗?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

flink-sql-gateway稳定性如何,可以在生产环境使用吗?

2021-01-25 文章 jinsx
如上,
 在生产环境部署flink-sql-gateway,通过Jdbc提交sql任务。但是有点担心稳定性问题,有大佬可以给点建议吗。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 多流join的场景如何优化

2021-01-25 文章 yang nick
flink sql + zeppelin

hl9...@126.com  于2021年1月26日周二 上午11:30写道:

> 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
>
> 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
> market_act(营销活动):
> {act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
> new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
>
> orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}
>
> 需求:按活动统计活动期间新会员产生的订单金额
> 伪sql:
> select act_id,count(1) as order_num,sum(amt) as order_amt
> from orders t1
> inner join new_member t2 on t1.member_id=t2.member_id
> inner join market_act t3 on t2.act_id=t3.act_id
> where t1.create_time between t3.start_time and t3.end_time ;
>
> 目前做法:
> 将 market_act 和 new_member 两个维表消息放到redis缓存,
> flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
> 是则输出{act_id,order_no,amt,member_id},然后sink到db。
>
> 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
>
>
>
> hl9...@126.com
>


多流join的场景如何优化

2021-01-25 文章 hl9...@126.com
请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。

电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
market_act(营销活动): 
{act_id:营销活动id,start_time:活动开始时间,end_time:活动结束时间,shop_id:活动的门店}
new_member(新增会员): {member_id:新会员id,act_id:吸引会员的营销活动id,create_time:新会员生成时间}
orders(订单):{order_no:订单号,amt:订单金额,member_id:会员id,create_time:订单时间,shop_id:下单的门店}

需求:按活动统计活动期间新会员产生的订单金额
伪sql: 
select act_id,count(1) as order_num,sum(amt) as order_amt 
from orders t1 
inner join new_member t2 on t1.member_id=t2.member_id
inner join market_act t3 on t2.act_id=t3.act_id 
where t1.create_time between t3.start_time and t3.end_time ;

目前做法:
将 market_act 和 new_member 两个维表消息放到redis缓存,
flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
是则输出{act_id,order_no,amt,member_id},然后sink到db。

我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?



hl9...@126.com


关于1.12新增的initialize阶段时间较长问题

2021-01-25 文章 赵一旦
如上,目前发现以前很快(10-30s)内能从敲命名到running的任务。现在有时候innitialize阶段就得1-2min。不清楚啥情况。


退订

2021-01-25 文章 541122...@qq.com
退订



541122...@qq.com


退订

2021-01-25 文章 纪军伟
退订

Re: python udf 提交到本地节点执行报错

2021-01-25 文章 Xingbo Huang
Hi,

看报错是你的客户端环境所使用的的`python`解释器没有安装pyflink。-pyexec指定的是你udf运行的worker所使用的python环境,但是你在客户端编译作业的时候也需要python环境,那个python环境也需要安装pyflink。

Best,
Xingbo


陈康 <844256...@qq.com> 于2021年1月25日周一 下午9:01写道:

> 你好、请教下配置pyflink、本地运行报错
> [root@hadoop01 ~]# pip list | grep flink
> apache-flink (1.12.0)
>
> [root@hadoop01 ~]# python3 -V
> Python 3.6.5
>
> flink run -m localhost:8081 -py datastream_tutorial.py -pyexec
> /usr/local/python3/bin/python3
>
>  File "datastream_tutorial.py", line 1, in 
> from pyflink.common.serialization import SimpleStringEncoder
> ModuleNotFoundError: No module named 'pyflink.common.serialization'
> ,请问下你是如何配置环境变量的吗?谢谢
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


Re: flink sql 执行limit 很少的语句依然会暴增

2021-01-25 文章 Shengkai Fang
hi,
报错信息: java.lang.UnsupportedOperationException: Currently, a
DynamicTableSource with SupportsLimitPushDown ability is not supported.

如果你当前的版本不是1.12的话,那么你还需要pick下rule[1]。可以关注下这个jira[2],这里包含了所有对于SupportXXX的优化。

如果只是本地测试的话还是建议用发布的1.12 + 之前提到的commit,自己pick可能有点问题。

[1] https://github.com/apache/flink/pull/12964
[2] https://issues.apache.org/jira/browse/FLINK-16987

zhang hao  于2021年1月25日周一 下午3:14写道:

> flink run -py new_jdbc_source.py
> Traceback (most recent call last):
>   File "new_jdbc_source.py", line 66, in 
> st_env.execute_sql("select * from feature_bar_sink").print()
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/table/table_environment.py",
> line 543, in execute_sql
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py",
> line 1286, in __call__
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/pyflink.zip/pyflink/util/exceptions.py",
> line 147, in deco
>   File
>
> "/Users/derek.bao/dev-utils/flink-1.11.2/opt/python/py4j-0.10.8.1-src.zip/py4j/protocol.py",
> line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling
> o10.executeSql.
> : java.lang.UnsupportedOperationException: Currently, a DynamicTableSource
> with SupportsLimitPushDown ability is not supported.
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:210)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable$$anonfun$validateTableSource$1.apply(CatalogSourceTable.scala:208)
> at scala.collection.immutable.List.foreach(List.scala:392)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.validateTableSource(CatalogSourceTable.scala:208)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.buildTableScan(CatalogSourceTable.scala:142)
> at
>
> org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(CatalogSourceTable.scala:78)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.toRel(SqlToRelConverter.java:3492)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(SqlToRelConverter.java:2415)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2102)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(SqlToRelConverter.java:2051)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(SqlToRelConverter.java:661)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(SqlToRelConverter.java:642)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(SqlToRelConverter.java:3345)
> at
>
> org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(SqlToRelConverter.java:568)
> at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org
>
> $apache$flink$table$planner$calcite$FlinkPlannerImpl$$rel(FlinkPlannerImpl.scala:164)
> at
>
> org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:151)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(SqlToOperationConverter.java:789)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(SqlToOperationConverter.java:761)
> at
>
> org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:238)
> at
>
> org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
> at
>
> org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(TableEnvironmentImpl.java:684)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
> at
>
> org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
> at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
> at
>
> org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
> at
>
> org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
> at java.lang.Thread.run(Thread.java:748)
>
>
> org.apache.flink.client.program.ProgramAbortException
> at org.apache.flink.client.python.PythonDriver.main(PythonDriver.java:95)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at
>
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
> at
>
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> 

Support

2021-01-25 文章 Global Givers Foundation
This is to inform you that you have been selected for a prize donation of Two 
Hundred and Fifty Thousand USD ($250,000.00) from the ongoing Global Givers 
Foundation programs. 

The selection process was carried out through random selection in our 
computerized email selection system (ESS) from a database of over 50,000 email 
addresses drawn from all the continents of the world which you were selected. 

This Donation is approved by Global Givers Foundation and also Licensed by the 
International Association of Charity Organization (IACO). To begin the 
processing of your donation, Kindly contact us via Email for more details: ( 
globalgv.foundation@gmail. com ) 

Thanks. 
Global Givers Foundation. 


Re: Streaming File Sink 不能生成 _SUCCESS 标记文件

2021-01-25 文章 Xavier
Hi highfei,
你的通过Streaming file sink写success 文件的问题解决了吗



--
Sent from: http://apache-flink.147419.n8.nabble.com/


Re: python udf 提交到本地节点执行报错

2021-01-25 文章 陈康
你好、请教下配置pyflink、本地运行报错
[root@hadoop01 ~]# pip list | grep flink
apache-flink (1.12.0)

[root@hadoop01 ~]# python3 -V
Python 3.6.5

flink run -m localhost:8081 -py datastream_tutorial.py -pyexec
/usr/local/python3/bin/python3

 File "datastream_tutorial.py", line 1, in 
from pyflink.common.serialization import SimpleStringEncoder
ModuleNotFoundError: No module named 'pyflink.common.serialization'
,请问下你是如何配置环境变量的吗?谢谢



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink batch sql使用union all的并行度与-p参数不一致问题

2021-01-25 文章 酷酷的浑蛋
在使用flink batch sql的  union all时,任务并行度跟设置的-p参数不一致


例如  select   a from t1 union all select a from t2……….
如果我-p设置了2,那么我union all了几个表,并行度就在-p基础上乘以几,-p=2  union  
all了3个表,那么并行度就为变为6了,请问这块怎么限制并行度为’2’?





flink1.12用不了flink-sql-gateway

2021-01-25 文章 jinsx
HI 大佬们,
flink1.12用不了flink-sql-gateway,请问为在什么时间支持?



--
Sent from: http://apache-flink.147419.n8.nabble.com/


回复: 根据业务需求选择合适的flink state

2021-01-25 文章 纪军伟
退订


| |
纪军伟
|
|
jjw8610...@163.com
|
签名由网易邮箱大师定制


在2021年01月23日 15:43,徐州州<25977...@qq.com> 写道:
我觉得你可以尝试一下TTL,keyby之后设置key状态的失效时间为1分钟,如果一分钟没数据进来就清空state。




-- 原始邮件 --
发件人: "张锴"https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#session-windows
> >
> > news_...@163.com 

Re: Caused by: java.lang.NoSuchMethodError: com.google.common.base.Preconditions.checkArgument(ZLjava/lang/String;Ljava/lang/Object;)V

2021-01-25 文章 Rui Li
Hi,

估计是Hadoop跟hive的guava版本冲突,Hadoop-3.3依赖的版本是27 [1],hive-3.1.2依赖的版本是19
[2]。另外请注意hive-3.1.2依赖的Hadoop版本是3.1.0 [3],一般不建议runtime的Hadoop版本高于hive依赖的版本。

解决方案一是在hive-exec里对guava做relocation,这个需要自己手动给hive-exec重新打包。
另一个办法是降低Hadoop版本,这里不一定需要降低集群的Hadoop版本,而是仅仅降低flink和hive这边用到的Hadoop版本,相当于用老的Hadoop
client去访问新的Hadoop server,这个小版本的兼容性一般来说是没问题的。

[1] https://issues.apache.org/jira/browse/HADOOP-16210
[2] https://github.com/apache/hive/blob/rel/release-3.1.2/pom.xml#L147
[3] https://github.com/apache/hive/blob/rel/release-3.1.2/pom.xml#L150

On Mon, Jan 25, 2021 at 2:12 PM yujianbo <15205029...@163.com> wrote:

> 请教一下大佬后来如何解决,我的hadoop和hive版本跟您一致。
>
>
>
> --
> Sent from: http://apache-flink.147419.n8.nabble.com/
>


-- 
Best regards!
Rui Li


对于 Flink 可撤回流做聚合,当上游数据撤回时,最终聚合值变小了,怎么解决这种问题。

2021-01-25 文章 LakeShen
Hi 社区,

之前遇到一个问题,在 Flink 里面,对于一个数据流(能够撤回的)进行聚合时,当这个数据流有大量撤回时,最终聚合值会变小。这种情况之前有看到说加一个
mini batch 来减轻这种情况的影响,还有别的方法能够解决这个问题吗?

Best,
LakeShen