flink写kafka时,并行度和分区数的设置问题
您好: flink将数据写入kafka【kafka为sink】,当kafka topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升? 是否有相关的源码可以查看。 期待回复,祝好,谢谢!
flink集群如何将日志直接写入elasticsearch中?
有没有比较方便快捷的解决方案?
Re:一次执行单条insert语句和一次执行多条insert语句有什么区别
Hi, fengqi. 上面那种statement的方式,最终将只会产生一个作业,这个作业有机会复用这个source(拓扑图1 个source -> 2 个calc_sink),因此只需要读一次source就行了。 下面那种execute sql两次的方式,将产生两个作业,两个作业完全独立。 -- Best! Xuyang At 2024-03-13 12:26:05, "ha.fen...@aisino.com" wrote: >StatementSet stmtSet = tEnv.createStatementSet(); >stmtSet.addInsertSql( > "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product > LIKE '%Rubber%'"); >stmtSet.addInsertSql( > "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product > LIKE '%Glass%'"); >TableResult tableResult2 = stmtSet.execute(); >与下面有什么区别? >tEnv.executeSql( > "INSERT INTO RubberOrders SELECT product, amount FROM Orders WHERE product > LIKE '%Rubber%'"); >tEnv.executeSql( > "INSERT INTO GlassOrders SELECT product, amount FROM Orders WHERE product > LIKE '%Glass%'");
Re: flink集群如何将日志直接写入elasticsearch中?
比较简单的方式是启动一个filebeat进程,抓取 jobmanager.log 和t askmanager.log Best, Jiabao kellygeorg...@163.com 于2024年3月13日周三 15:30写道: > 有没有比较方便快捷的解决方案? > > >
Re: flink写kafka时,并行度和分区数的设置问题
你好, 写 Kafka 分区的策略取决于使用的 Kafka Sink 的 Partitioner [1],默认使用的是 Kafka 的 Default Partitioner,底层使用了一种称之为黏性分区的策略:对于指定 key 的数据按照对 key hash 的方式选择分区写入,对于未指定 key 的数据则随机选择一个分区,然后“黏住”这个分区一段时间以提升攒批效果,然后攒批结束写完后再随机换一个分区,来在攒批效果和均匀写入间做一个平衡。 具体可以参考 [2]。 因此,默认配置下不存在你说的遍历导致攒批效果下降的问题,在达到 Kafka 单分区写入瓶颈前,只是扩大写入并发就会有比较好的提升写入吞吐的效果。不过在一些特殊情况下,比如如果你并发很高,单并发写入 QPS 极低,以致于单次攒批周期内甚至只有一两条消息,导致攒批效果差,打到 Kafka 写入瓶颈,那么降低并发可能反而能通过提升攒批效果的形式,配合写入压缩降低写入 Kafka 流量,提升写入吞吐。 [1] https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/kafka/#sink-partitioning [2] https://www.cnblogs.com/huxi2b/p/12540092.html From: chenyu_opensource Sent: Wednesday, March 13, 2024 15:25 To: user-zh@flink.apache.org Subject: flink写kafka时,并行度和分区数的设置问题 您好: flink将数据写入kafka【kafka为sink】,当kafka topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升? 是否有相关的源码可以查看。 期待回复,祝好,谢谢!
Re: 如何查询create table语句的详细内容
用show create table语句 [image: Screenshot 2024-03-13 103802.png] ha.fen...@aisino.com 于2024年3月12日周二 15:37写道: > 例如 > CREATE TABLE Orders_in_kafka ( > -- 添加 watermark 定义 > WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND > ) WITH ( > 'connector' = 'kafka', > ... > ) > LIKE Orders_in_file ( > EXCLUDING ALL > INCLUDING GENERATED > ); > > 通过like生成的表,如何查看Orders_in_kafka 这个表完整的create table定义。 >
Re: 如何查询create table语句的详细内容
刚刚图没发完整 [image: Screenshot 2024-03-13 103802.png] Yubin Li 于2024年3月13日周三 17:44写道: > 用show create table语句 > [image: Screenshot 2024-03-13 103802.png] > > ha.fen...@aisino.com 于2024年3月12日周二 15:37写道: > >> 例如 >> CREATE TABLE Orders_in_kafka ( >> -- 添加 watermark 定义 >> WATERMARK FOR order_time AS order_time - INTERVAL '5' SECOND >> ) WITH ( >> 'connector' = 'kafka', >> ... >> ) >> LIKE Orders_in_file ( >> EXCLUDING ALL >> INCLUDING GENERATED >> ); >> >> 通过like生成的表,如何查看Orders_in_kafka 这个表完整的create table定义。 >> >
退订
退订
Re:flink写kafka时,并行度和分区数的设置问题
退订 在 2024-03-13 15:25:27,"chenyu_opensource" 写道: >您好: > flink将数据写入kafka【kafka为sink】,当kafka > topic分区数【设置的60】小于设置的并行度【设置的300】时,task是轮询写入这些分区吗,是否会影响写入效率?【是否存在遍历时的耗时情况】。 > 此时,如果扩大topic的分区数【添加至200,或者直接到300】,写入的效率是否会有明显的提升? > > 是否有相关的源码可以查看。 >期待回复,祝好,谢谢! > > >