我们还没用到flink sql,有用流API实现的思路吗?
hl9...@126.com
发件人: yang nick
发送时间: 2021-01-26 11:32
收件人: user-zh
主题: Re: 多流join的场景如何优化
flink sql + zeppelin
hl9...@126.com 于2021年1月26日周二 上午11:30写道:
> 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。
>
> 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段):
> market_act(营销活动):
market_act 和 new_member 两个维表消息放到redis缓存,
flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动,
是则输出{act_id,order_no,amt,member_id},然后sink到db。
我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算?
hl9...@126.com
489c40】:初始化状态!
flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std.
Out (2/3)] com.toonyoo.operator.PersonFlatMap -
PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态!
...
hl9...@126.com
是flink standalone 集群。
job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。
hl9...@126.com
发件人: Xintong Song
发送时间: 2020-11-12 13:18
收件人: user-zh
主题: Re: slot数量与并行度的大小关系
你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的?
流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3
.
是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度?
hl9...@126.com
确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。
另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗?
hl9...@126.com
发件人: Jark Wu
发送时间: 2020-11-05 11:55
收件人: user-zh
主题: Re: cdc代码报错
环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。
On Thu, 5 Nov 2020 at 11:35, hl9...@126.com wrote:
> flink版本1.11.2,有没有大佬遇到这
flink版本1.11.2,有没有大佬遇到这个问题?
hl9...@126.com
发件人: hl9...@126.com
发送时间: 2020-11-04 16:43
收件人: user-zh
主题: cdc代码报错
Hi,all:
我运行ververica/flink-cdc-connectors git上的demo代码,报错:
2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out
(1/1)] W
arallelism 1 for sink to keep message ordering
env.execute();
}
}
hl9...@126.com
感谢。我这个场景中op1和op2是串行的,那只能把op1的状态也发到下游的op2。
hl9...@126.com
发件人: Qi Kang
发送时间: 2020-11-04 14:53
收件人: user-zh
主题: Re: 不同的算子能共用一个状态吗?
Hi,
Flink不支持算子共享状态。如果你的op1和op2是并行的,就只能利用外部存储间接地共享状态数据。如果是串行的(op2在op1的下游),也可以尝试考虑把op1产生的状态数据作为流元素发送到op2中去。希望对你有所帮助。
> On Nov 4, 2020, at 14:48, hl9...@126.
Hi,all:
我定义了两个flatmap算子(op1和op2),op1里面定义了一个MapState变量,我想在op2里面直接用这个状态,可以吗?
我感觉是不行的,没有找到相关的api。请各位大佬帮忙明确一下。
hl9...@126.com
没有修改kafka,就用官方的jar。后来我用1.11.2版本重新尝试了下,成功了,没有任何错误。
这个问题就不纠结了
hl9...@126.com
发件人: Benchao Li
发送时间: 2020-09-29 18:17
收件人: user-zh
主题: Re: Re: sql-cli执行sql报错
这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了,
但是却报了一个非shaded的ByteArrayDeserializer。
我感觉这个应该是你自己添加了一下比较特殊的
leEnvironmentImpl
at java.net.URLClassLoader.findClass(URLClassLoader.java:381)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
pox table相关依赖:
org.apache.flink
flink-table-api-java-bridge_2.11
${flink.version}
provided
hl9...@126.com
not an instance
of
org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer
附:lib包清单
[test@rcx51101 lib]$ pwd
/opt/flink-1.10.2/lib
flink-csv-1.10.2.jar
flink-dist_2.12-1.10.2.jar
flink-jdbc_2.12-1.10.2.jar
flink-json-1.10.2.jar
flink-shaded-hadoop-2-uber-2.6.5-10.0.ja
我按照下面的步骤尝试了下,依然报同样的错误:
错误信息:java.lang.ClassNotFoundException:
org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
相关lib包:
flink-connector-kafka_2.12-1.10.2.jar
kafka-clients-0.11.0.3.jar (之前是kafka-clients-2.0.0.jar)
hl9...@126.com
发件人: 111
发送时间: 2020-09-28 10:41
收件人: user-zh
fka-clients-2.0.0.jar
hl9...@126.com
发件人: 111
发送时间: 2020-09-28 09:23
收件人: user-zh@flink.apache.org
主题: 回复:sql-cli执行sql报错
你貌似使用的是flink-1.11的语法。
可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector
| |
xinghalo
|
|
xi
ESTAMP(3)
schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND
topic=heli01
The following factories have been considered:
org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory
org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory
org.apache.flink.table.sources.CsvBatchTableSourceFactory
org.apache.flink.table.sources.CsvAppendTableSourceFactory
hl9...@126.com
16 matches
Mail list logo