Re: Re: 多流join的场景如何优化

2021-01-25 文章 hl9...@126.com
我们还没用到flink sql,有用流API实现的思路吗? hl9...@126.com 发件人: yang nick 发送时间: 2021-01-26 11:32 收件人: user-zh 主题: Re: 多流join的场景如何优化 flink sql + zeppelin hl9...@126.com 于2021年1月26日周二 上午11:30写道: > 请教各位大佬,我现在有个多流join计算的场景,不知道该如何进行优化。 > > 电商业务有3个kafka消息源,消息结构描述如下(只列举主要字段): > market_act(营销活动):

多流join的场景如何优化

2021-01-25 文章 hl9...@126.com
market_act 和 new_member 两个维表消息放到redis缓存, flink接orders 消息,在flatmap中读取redis维表信息,判断当前订单是否属于某个有效的活动, 是则输出{act_id,order_no,amt,member_id},然后sink到db。 我感觉这种做法没有充分发挥flink流的特性,有没有办法在flink里面直接join这3个流,搭配状态,进行计算? hl9...@126.com

flink算子类在多个subtask中是各自初始化1个实例对象吗?

2020-11-15 文章 hl9...@126.com
489c40】:初始化状态! flink-10.2 - [2020-11-16 13:42:00.356] - INFO [Flat Map -> Sink: Print to Std. Out (2/3)] com.toonyoo.operator.PersonFlatMap - PersonFlatMap【com.toonyoo.operator.PersonFlatMap@1cd4f5c9】:初始化状态! ... hl9...@126.com

Re: Re: slot数量与并行度的大小关系

2020-11-12 文章 hl9...@126.com
是flink standalone 集群。 job并行度是在job的java代码中通过 streamExecutionEnvironment.setParallelism(15) 来指定的。 hl9...@126.com 发件人: Xintong Song 发送时间: 2020-11-12 13:18 收件人: user-zh 主题: Re: slot数量与并行度的大小关系 你是部署的 flink standalone 集群吗?目前作业的并行度 15 是通过什么方式指定的? 流处理作业默认是至少要拿到并行度数量的 slot 才能够运行的。可以通过 Shawn 提到的 [3

slot数量与并行度的大小关系

2020-11-10 文章 hl9...@126.com
. 是因为slot数量必须要大于并行度吗?有没有参数可以让flink自动选择可用slot数作为job的并行度? hl9...@126.com

Re: Re: cdc代码报错

2020-11-04 文章 hl9...@126.com
确实是pom中存在一个kafka的依赖包,冲突了,去掉后问题解决。感谢大佬。 另外,如果我想使用kafka,就必须引入kafka的包,还是会冲突,有什么解决办法吗? hl9...@126.com 发件人: Jark Wu 发送时间: 2020-11-05 11:55 收件人: user-zh 主题: Re: cdc代码报错 环境中估计有另一个版本的 kafka-connect jar 包,导致依赖冲突了。 On Thu, 5 Nov 2020 at 11:35, hl9...@126.com wrote: > flink版本1.11.2,有没有大佬遇到这

回复: cdc代码报错

2020-11-04 文章 hl9...@126.com
flink版本1.11.2,有没有大佬遇到这个问题? hl9...@126.com 发件人: hl9...@126.com 发送时间: 2020-11-04 16:43 收件人: user-zh 主题: cdc代码报错 Hi,all: 我运行ververica/flink-cdc-connectors git上的demo代码,报错: 2020-11-04 16:39:10.972 [Source: Custom Source -> Sink: Print to Std. Out (1/1)] W

cdc代码报错

2020-11-04 文章 hl9...@126.com
arallelism 1 for sink to keep message ordering env.execute(); } } hl9...@126.com

Re: Re: 不同的算子能共用一个状态吗?

2020-11-03 文章 hl9...@126.com
感谢。我这个场景中op1和op2是串行的,那只能把op1的状态也发到下游的op2。 hl9...@126.com 发件人: Qi Kang 发送时间: 2020-11-04 14:53 收件人: user-zh 主题: Re: 不同的算子能共用一个状态吗? Hi, Flink不支持算子共享状态。如果你的op1和op2是并行的,就只能利用外部存储间接地共享状态数据。如果是串行的(op2在op1的下游),也可以尝试考虑把op1产生的状态数据作为流元素发送到op2中去。希望对你有所帮助。 > On Nov 4, 2020, at 14:48, hl9...@126.

不同的算子能共用一个状态吗?

2020-11-03 文章 hl9...@126.com
Hi,all: 我定义了两个flatmap算子(op1和op2),op1里面定义了一个MapState变量,我想在op2里面直接用这个状态,可以吗? 我感觉是不行的,没有找到相关的api。请各位大佬帮忙明确一下。 hl9...@126.com

Re: Re: sql-cli执行sql报错

2020-09-29 文章 hl9...@126.com
没有修改kafka,就用官方的jar。后来我用1.11.2版本重新尝试了下,成功了,没有任何错误。 这个问题就不纠结了 hl9...@126.com 发件人: Benchao Li 发送时间: 2020-09-29 18:17 收件人: user-zh 主题: Re: Re: sql-cli执行sql报错 这个错误看起来比较奇怪。正常来讲flink-sql-connector-kafka_2.11-1.10.2.jar里面应该都是shaded之后的class了, 但是却报了一个非shaded的ByteArrayDeserializer。 我感觉这个应该是你自己添加了一下比较特殊的

创建BatchTableEnvironment报错

2020-09-29 文章 hl9...@126.com
leEnvironmentImpl at java.net.URLClassLoader.findClass(URLClassLoader.java:381) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) pox table相关依赖: org.apache.flink flink-table-api-java-bridge_2.11 ${flink.version} provided hl9...@126.com

回复: Re: sql-cli执行sql报错

2020-09-28 文章 hl9...@126.com
not an instance of org.apache.flink.kafka.shaded.org.apache.kafka.common.serialization.Deserializer 附:lib包清单 [test@rcx51101 lib]$ pwd /opt/flink-1.10.2/lib flink-csv-1.10.2.jar flink-dist_2.12-1.10.2.jar flink-jdbc_2.12-1.10.2.jar flink-json-1.10.2.jar flink-shaded-hadoop-2-uber-2.6.5-10.0.ja

Re: 回复: sql-cli执行sql报错

2020-09-27 文章 hl9...@126.com
我按照下面的步骤尝试了下,依然报同样的错误: 错误信息:java.lang.ClassNotFoundException: org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer 相关lib包: flink-connector-kafka_2.12-1.10.2.jar kafka-clients-0.11.0.3.jar (之前是kafka-clients-2.0.0.jar) hl9...@126.com 发件人: 111 发送时间: 2020-09-28 10:41 收件人: user-zh

Re: 回复:sql-cli执行sql报错

2020-09-27 文章 hl9...@126.com
fka-clients-2.0.0.jar hl9...@126.com 发件人: 111 发送时间: 2020-09-28 09:23 收件人: user-zh@flink.apache.org 主题: 回复:sql-cli执行sql报错 你貌似使用的是flink-1.11的语法。 可以修改成flink-1.10的语法试试,参考文档:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/connect.html#kafka-connector | | xinghalo | | xi

sql-cli执行sql报错

2020-09-27 文章 hl9...@126.com
ESTAMP(3) schema.watermark.0.strategy.expr=`transaction_time` - INTERVAL '5' SECOND topic=heli01 The following factories have been considered: org.apache.flink.api.java.io.jdbc.JDBCTableSourceSinkFactory org.apache.flink.streaming.connectors.kafka.KafkaTableSourceSinkFactory org.apache.flink.table.sources.CsvBatchTableSourceFactory org.apache.flink.table.sources.CsvAppendTableSourceFactory hl9...@126.com