Re: Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
我理解   
是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
这样的话,大部分场景其实都适合使用map-state。


lxk7...@163.com
 
From: jurluo(罗凯)
Date: 2022-05-25 11:05
To: user-zh@flink.apache.org
Subject: Re: [Internet]Re: Re: Some question with Flink state
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。
 
> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
> new data("1", "123", "分类页"),
> 
> 
> 
> new data("2", "r-123", "搜索结果页"),
> 
> 
> 
> new data("2", "r-123", "我的页"),
> 
> 
> 
> new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 

Re: Kafka source 检测到分区变更时发生 WakeupException

2022-05-24 文章 Qingsheng Ren
Hi,

感谢反馈,看上去是一个 bug。可以在 Apache JIRA [1] 上新建一个 ticket 吗?

[1] https://issues.apache.org/jira

> On May 25, 2022, at 11:35, 邹璨  wrote:
> 
> flink版本: 1.14.3
> 模块:connectors/kafka 
> 问题描述:
> 我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下
> 
> 2022-05-10 15:08:03
> java.lang.RuntimeException: One or more fetchers have encountered exception
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
> at 
> org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
> at 
> org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
> at 
> org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
> at 
> org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
> at 
> org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
> at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
> at 
> org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
> at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
> unexpected exception while polling the records
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
> at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
> at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
> ... 1 more
> Caused by: org.apache.kafka.common.errors.WakeupException
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
> at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
> at 
> org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
> at 
> org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
> at 
> org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
> ... 6 more
> 
> 
> 
> 根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
> 随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。
> 
> 
> 
> 
> 
> 
> 此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
> 如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
> 如果此电子邮件发送不正确,请立即联系 NAVER 
> Security(dl_naversecur...@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
> ​
> This email and the information contained in this email are intended solely 
> for the recipient(s) addressed above and may contain information that is 
> confidential and/or privileged or whose disclosure is prohibited by law or 
> other reasons.
> If you are not the intended recipient of this email, please be advised that 
> any unauthorized storage, duplication, dissemination, distribution or 
> disclosure of all or part of this email is strictly prohibited.
> If you received this email in error, please immediately contact NAVER 
> Security (dl_naversecur...@navercorp.com) and delete this email and any 
> copies and attachments from your system. Thank you for your cooperation.​



Kafka source 检测到分区变更时发生 WakeupException

2022-05-24 文章 邹璨
flink版本: 1.14.3
模块:connectors/kafka 
问题描述:
我在使用kafka分区动态发现时发生了WakeupException,导致Job失败。异常信息如下

2022-05-10 15:08:03
java.lang.RuntimeException: One or more fetchers have encountered exception
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager.checkErrors(SplitFetcherManager.java:225)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.getNextFetch(SourceReaderBase.java:169)
at 
org.apache.flink.connector.base.source.reader.SourceReaderBase.pollNext(SourceReaderBase.java:130)
at 
org.apache.flink.streaming.api.operators.SourceOperator.emitNext(SourceOperator.java:350)
at 
org.apache.flink.streaming.runtime.io.StreamTaskSourceInput.emitNext(StreamTaskSourceInput.java:68)
at 
org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at 
org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at 
org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: SplitFetcher thread 0 received 
unexpected exception while polling the records
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:150)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.run(SplitFetcher.java:105)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Caused by: org.apache.kafka.common.errors.WakeupException
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.maybeTriggerWakeup(ConsumerNetworkClient.java:511)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:275)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1726)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.position(KafkaConsumer.java:1684)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.removeEmptySplits(KafkaPartitionSplitReader.java:315)
at 
org.apache.flink.connector.kafka.source.reader.KafkaPartitionSplitReader.handleSplitsChanges(KafkaPartitionSplitReader.java:200)
at 
org.apache.flink.connector.base.source.reader.fetcher.AddSplitsTask.run(AddSplitsTask.java:51)
at 
org.apache.flink.connector.base.source.reader.fetcher.SplitFetcher.runOnce(SplitFetcher.java:142)
... 6 more



根据异常栈轨迹简单查阅源码发现,KafkaSource在处理分区变更时,会通过调用consumer.wakeup中断正在拉取数据的consumer。
随后在处理分区变更时会调用consumer.position方法,由于consumer已经被唤醒,此时会抛出WakeupException。






此电子邮件及其包含的信息将仅发送给上面列出的收件人,必须加以保护,并且可能包含法律或其他原因禁止披露的信息。
如果您不是此电子邮件的预期收件人,未经许可,您不得存储、复制、发送、分发或披露它。 禁止存储、复制、发送、分发或披露电子邮件的任何部分。
如果此电子邮件发送不正确,请立即联系 NAVER 
Security(dl_naversecur...@navercorp.com)。然后删除所有原件、副本和附件。谢谢您的合作。
​
This email and the information contained in this email are intended solely for 
the recipient(s) addressed above and may contain information that is 
confidential and/or privileged or whose disclosure is prohibited by law or 
other reasons.
If you are not the intended recipient of this email, please be advised that any 
unauthorized storage, duplication, dissemination, distribution or disclosure of 
all or part of this email is strictly prohibited.
If you received this email in error, please immediately contact NAVER Security 
(dl_naversecur...@navercorp.com) and delete this email and any copies and 
attachments from your system. Thank you for your cooperation.​


Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 罗凯
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。

> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
> new data("1", "123", "分类页"),
> 
> 
> 
> new data("2", "r-123", "搜索结果页"),
> 
> 
> 
> new data("2", "r-123", "我的页"),
> 
> 
> 
> new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private String id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(String id, String goods, String pageName) {
> 
> 
> 
> this.id = 

Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
图片好像又挂了  我重发下
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准



   下面是我的代码及测试结果



    一.使用int类型



   public class KeyByTest {



    public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    env.setParallelism(10);





    DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),



    new data(1, "123", "分类页"),



    new data(2, "r-123", "搜索结果页"),



    new data(1, "r-123", "我的页"),



    new data(3, "r-4567", "搜索结果页")));











    SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())



    .map(new RichMapFunction() {





    @Override



    public String map(data data) throws Exception {



    System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );



    return data.toString();



    }



    });







    env.execute("test");





    }



}



class data{



    private int id;



    private String goods;



    private String pageName;





    public data(int id, String goods, String pageName) {



    this.id = id;



    this.goods = goods;



    this.pageName = pageName;



    }







    public data() {



    }





    public int getId() {



    return id;



    }





    public void setId(int id) {



    this.id = id;



    }





    public String getGoods() {



    return goods;



    }





    public void setGoods(String goods) {



    this.goods = goods;



    }





    public String getPageName() {



    return pageName;



    }





    public void setPageName(String pageName) {



    this.pageName = pageName;



    }





    @Override



    public String toString() {



    return "data{" +



    "id='" + id + '\'' +



    ", goods='" + goods + '\'' +



    ", pageName='" + pageName + '\'' +



    '}';



    }



}





class MyKeySelector implements KeySelector{





    @Override



    public Integer getKey(data data) throws Exception {



    return data.getId();



    }



}



控制台的输出如下:
https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png



可以看见数据根据id分组,分到了不同的subtask上。







二.使用String类型  代码如下:



public class KeyByTest {



    public static void main(String[] args) throws Exception {



    StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();



    env.setParallelism(10);





    DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),



    new data("1", "123", "分类页"),



    new data("2", "r-123", "搜索结果页"),



    new data("2", "r-123", "我的页"),



    new data("3", "r-4567", "搜索结果页")));











    SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())



    .map(new RichMapFunction() {





    @Override



    public String map(data data) throws Exception {



    System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );



    return data.toString();



    }



    });







    env.execute("test");





    }



}



class data{



    private String id;



    private String goods;



    private String pageName;





    public data(String id, String goods, String pageName) {



    this.id = id;



    this.goods = goods;



    this.pageName = pageName;



    }







    public data() {



    }





    public String getId() {



    return id;



    }





    public void setId(String id) {



    this.id = id;



    }





    public String getGoods() {



    return goods;



    }





    public void setGoods(String goods) {



    this.goods = goods;



    }





    public String getPageName() {



    return pageName;



    }





    public void setPageName(String pageName) {



    this.pageName = pageName;



    }





    @Override



    public String toString() {



    return "data{" +



    "id='" + id + '\'' +



    ", goods='" + goods + '\'' +



    ", pageName='" + pageName + '\'' +



    '}';



    }



}





class MyKeySelector implements KeySelector{





    @Override



    public String getKey(data data) throws Exception {



    return data.getId();



    }



}



最终控制台输出如下:


https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png




可以看见只分了两个组,我不清楚这是否是一个bug.







lxk7...@163.com



 



From: Xuyang



Date: 2022-05-24 21:35

Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
   下面是我的代码及测试结果
一.使用int类型
   public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
new data(1, "123", "分类页"),
new data(2, "r-123", "搜索结果页"),
new data(1, "r-123", "我的页"),
new data(3, "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private int id;
private String goods;
private String pageName;

public data(int id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public int getId() {
return id;
}

public void setId(int id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public Integer getKey(data data) throws Exception {
return data.getId();
}
}
控制台的输出如下:
可以看见数据根据id分组,分到了不同的subtask上。


二.使用String类型  代码如下:
public class KeyByTest {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(10);

DataStreamSource dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
new data("1", "123", "分类页"),
new data("2", "r-123", "搜索结果页"),
new data("2", "r-123", "我的页"),
new data("3", "r-4567", "搜索结果页")));




SingleOutputStreamOperator map = dataDataStreamSource.keyBy(new 
MyKeySelector())
.map(new RichMapFunction() {

@Override
public String map(data data) throws Exception {
System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
return data.toString();
}
});


env.execute("test");

}
}
class data{
private String id;
private String goods;
private String pageName;

public data(String id, String goods, String pageName) {
this.id = id;
this.goods = goods;
this.pageName = pageName;
}


public data() {
}

public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getGoods() {
return goods;
}

public void setGoods(String goods) {
this.goods = goods;
}

public String getPageName() {
return pageName;
}

public void setPageName(String pageName) {
this.pageName = pageName;
}

@Override
public String toString() {
return "data{" +
"id='" + id + '\'' +
", goods='" + goods + '\'' +
", pageName='" + pageName + '\'' +
'}';
}
}

class MyKeySelector implements KeySelector{

@Override
public String getKey(data data) throws Exception {
return data.getId();
}
}
最终控制台输出如下:


可以看见只分了两个组,我不清楚这是否是一个bug.


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 21:35
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>>

Re:Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-24 文章 sjf0115
好的 谢谢
在 2022-05-24 21:23:56,"Xuyang"  写道:
>Hi, 
>我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink
> Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
>在 2022-05-23 18:24:17,"sjf0115"  写道:
>>Flink 版本:1.13.5
>>
>>
>>
>>
>>函数完整代码如下:
>>```
>>public class Top2RetractTableAggregateFunction extends 
>>TableAggregateFunction, 
>>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>>private static final Logger LOG = 
>> LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>>// Top2 聚合中间结果数据结构
>>public static class Top2RetractAccumulator {
>>public long beforeFirst = 0;
>>public long beforeSecond = 0;
>>public long afterFirst = 0;
>>public long afterSecond = 0;
>>}
>>
>>
>>// 创建 Top2Accumulator 累加器并做初始化
>>@Override
>>public Top2RetractAccumulator createAccumulator() {
>>LOG.info("[INFO] createAccumulator ...");
>>Top2RetractAccumulator acc = new Top2RetractAccumulator();
>>acc.beforeFirst = Integer.MIN_VALUE;
>>acc.beforeSecond = Integer.MIN_VALUE;
>>acc.afterFirst = Integer.MIN_VALUE;
>>acc.afterSecond = Integer.MIN_VALUE;
>>return acc;
>>}
>>
>>
>>// 接收输入元素并累加到 Accumulator 数据结构
>>public void accumulate(Top2RetractAccumulator acc, Long value) {
>>LOG.info("[INFO] accumulate ...");
>>if (value > acc.afterFirst) {
>>acc.afterSecond = acc.afterFirst;
>>acc.afterFirst = value;
>>} else if (value > acc.afterSecond) {
>>acc.afterSecond = value;
>>}
>>}
>>
>>
>>// 带撤回的输出
>>public void emitUpdateWithRetract(Top2RetractAccumulator acc, 
>> RetractableCollector> out) {
>>LOG.info("[INFO] emitUpdateWithRetract ...");
>>if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>>// 撤回旧记录
>>if (acc.beforeFirst != Integer.MIN_VALUE) {
>>out.retract(Tuple2.of(acc.beforeFirst, 1));
>>}
>>// 输出新记录
>>out.collect(Tuple2.of(acc.afterFirst, 1));
>>acc.beforeFirst = acc.afterFirst;
>>}
>>if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>>// 撤回旧记录
>>if (acc.beforeSecond != Integer.MIN_VALUE) {
>>out.retract(Tuple2.of(acc.beforeSecond, 2));
>>}
>>// 输出新记录
>>out.collect(Tuple2.of(acc.afterSecond, 2));
>>acc.beforeSecond = acc.afterSecond;
>>}
>>}
>>}
>>```
>>调用完整代码如下:
>>```
>>// 执行环境
>>StreamExecutionEnvironment env = 
>>StreamExecutionEnvironment.getExecutionEnvironment();
>>env.setParallelism(1);
>>EnvironmentSettings settings = EnvironmentSettings
>>.newInstance()
>>.useOldPlanner() // Blink Planner 异常 Old Planner 可以
>>.inStreamingMode()
>>.build();
>>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>>
>>
>>DataStream sourceStream = env.fromElements(
>>Row.of("李雷", "语文", 78),
>>Row.of("韩梅梅", "语文", 50),
>>Row.of("李雷", "语文", 99),
>>Row.of("韩梅梅", "语文", 80),
>>Row.of("李雷", "英语", 90),
>>Row.of("韩梅梅", "英语", 40),
>>Row.of("李雷", "英语", 98),
>>Row.of("韩梅梅", "英语", 88)
>>);
>>
>>
>>// 注册虚拟表
>>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), 
>>$("score"));
>>// 注册临时i系统函数
>>tEnv.createTemporarySystemFunction("Top2", new 
>>Top2RetractTableAggregateFunction());
>>// 调用函数
>>tEnv.from("stu_score")
>>.groupBy($("course"))
>>.flatAggregate(call("Top2", $("score")).as("score", "rank"))
>>.select($("course"), $("score"), $("rank"))
>>.execute()
>>.print();
>>```
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2022-05-23 18:21:42,"sjf0115"  写道:
>>>函数代码如下:```public class Top2RetractTableAggregateFunction extends 
>>>TableAggregateFunctionTuple2Long, Integer, 
>>>Top2RetractTableAggregateFunction.Top2RetractAccumulator {
>>>private static final Logger LOG = 
>>>LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);// 
>>>Top2 聚合中间结果数据结构public static class Top2RetractAccumulator {
>>>public long beforeFirst = 0;public long beforeSecond = 
>>>0;public long afterFirst = 0;public long 
>>>afterSecond = 0;}// 创建 Top2Accumulator 累加器并做初始化  
>>>  @Overridepublic Top2RetractAccumulator createAccumulator() { 
>>>   LOG.info("[INFO] createAccumulator 
>>>...");Top2RetractAccumulator acc = new 
>>>Top2RetractAccumulator();acc.beforeFirst = 
>>>Integer.MIN_VALUE;acc.beforeSecond = Integer.MIN_VALUE;
>>>acc.afterFirst = Integer.MIN_VALUE;acc.afterSecond = 
>>>Integer.MIN_VALUE;return acc;}// 
>>>接收输入元素并累加到 Accumulator 数据结构  

Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com"  写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7...@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7...@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to u...@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>>
>>> >
>>> > Hi everyone
>>> >I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7...@163.com
>>> >
>>>


Re:Re:Re:Re:在自定义表聚合函数 TableAggregateFunction 使用 emitUpdateWithRetract 异常

2022-05-24 文章 Xuyang
Hi, 
我debug到了代码里,似乎是个bug。如果使用OldPlanner的话,emitValue和emitUpdateWithRetract仅需定义一个就可以了,并且emitUpdateWithRetract的优先级大于emitValue。但是在Blink
 Planner里,只看有没有定义emitValue。你可以去Flink issue上提一下这个bug
在 2022-05-23 18:24:17,"sjf0115"  写道:
>Flink 版本:1.13.5
>
>
>
>
>函数完整代码如下:
>```
>public class Top2RetractTableAggregateFunction extends 
>TableAggregateFunction, 
>Top2RetractTableAggregateFunction.Top2RetractAccumulator> {
>private static final Logger LOG = 
> LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);
>// Top2 聚合中间结果数据结构
>public static class Top2RetractAccumulator {
>public long beforeFirst = 0;
>public long beforeSecond = 0;
>public long afterFirst = 0;
>public long afterSecond = 0;
>}
>
>
>// 创建 Top2Accumulator 累加器并做初始化
>@Override
>public Top2RetractAccumulator createAccumulator() {
>LOG.info("[INFO] createAccumulator ...");
>Top2RetractAccumulator acc = new Top2RetractAccumulator();
>acc.beforeFirst = Integer.MIN_VALUE;
>acc.beforeSecond = Integer.MIN_VALUE;
>acc.afterFirst = Integer.MIN_VALUE;
>acc.afterSecond = Integer.MIN_VALUE;
>return acc;
>}
>
>
>// 接收输入元素并累加到 Accumulator 数据结构
>public void accumulate(Top2RetractAccumulator acc, Long value) {
>LOG.info("[INFO] accumulate ...");
>if (value > acc.afterFirst) {
>acc.afterSecond = acc.afterFirst;
>acc.afterFirst = value;
>} else if (value > acc.afterSecond) {
>acc.afterSecond = value;
>}
>}
>
>
>// 带撤回的输出
>public void emitUpdateWithRetract(Top2RetractAccumulator acc, 
> RetractableCollector> out) {
>LOG.info("[INFO] emitUpdateWithRetract ...");
>if (!Objects.equals(acc.afterFirst, acc.beforeFirst)) {
>// 撤回旧记录
>if (acc.beforeFirst != Integer.MIN_VALUE) {
>out.retract(Tuple2.of(acc.beforeFirst, 1));
>}
>// 输出新记录
>out.collect(Tuple2.of(acc.afterFirst, 1));
>acc.beforeFirst = acc.afterFirst;
>}
>if (!Objects.equals(acc.afterSecond, acc.beforeSecond)) {
>// 撤回旧记录
>if (acc.beforeSecond != Integer.MIN_VALUE) {
>out.retract(Tuple2.of(acc.beforeSecond, 2));
>}
>// 输出新记录
>out.collect(Tuple2.of(acc.afterSecond, 2));
>acc.beforeSecond = acc.afterSecond;
>}
>}
>}
>```
>调用完整代码如下:
>```
>// 执行环境
>StreamExecutionEnvironment env = 
>StreamExecutionEnvironment.getExecutionEnvironment();
>env.setParallelism(1);
>EnvironmentSettings settings = EnvironmentSettings
>.newInstance()
>.useOldPlanner() // Blink Planner 异常 Old Planner 可以
>.inStreamingMode()
>.build();
>StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);
>
>
>DataStream sourceStream = env.fromElements(
>Row.of("李雷", "语文", 78),
>Row.of("韩梅梅", "语文", 50),
>Row.of("李雷", "语文", 99),
>Row.of("韩梅梅", "语文", 80),
>Row.of("李雷", "英语", 90),
>Row.of("韩梅梅", "英语", 40),
>Row.of("李雷", "英语", 98),
>Row.of("韩梅梅", "英语", 88)
>);
>
>
>// 注册虚拟表
>tEnv.createTemporaryView("stu_score", sourceStream, $("name"), $("course"), 
>$("score"));
>// 注册临时i系统函数
>tEnv.createTemporarySystemFunction("Top2", new 
>Top2RetractTableAggregateFunction());
>// 调用函数
>tEnv.from("stu_score")
>.groupBy($("course"))
>.flatAggregate(call("Top2", $("score")).as("score", "rank"))
>.select($("course"), $("score"), $("rank"))
>.execute()
>.print();
>```
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2022-05-23 18:21:42,"sjf0115"  写道:
>>函数代码如下:```public class Top2RetractTableAggregateFunction extends 
>>TableAggregateFunctionTuple2Long, Integer, 
>>Top2RetractTableAggregateFunction.Top2RetractAccumulator {
>>private static final Logger LOG = 
>>LoggerFactory.getLogger(Top2RetractTableAggregateFunction.class);// 
>>Top2 聚合中间结果数据结构public static class Top2RetractAccumulator { 
>>   public long beforeFirst = 0;public long beforeSecond = 
>>0;public long afterFirst = 0;public long 
>>afterSecond = 0;}// 创建 Top2Accumulator 累加器并做初始化   
>> @Overridepublic Top2RetractAccumulator createAccumulator() {   
>> LOG.info("[INFO] createAccumulator ...");   
>> Top2RetractAccumulator acc = new Top2RetractAccumulator();
>>acc.beforeFirst = Integer.MIN_VALUE;acc.beforeSecond = 
>>Integer.MIN_VALUE;acc.afterFirst = Integer.MIN_VALUE;   
>> acc.afterSecond = Integer.MIN_VALUE;return acc;
>>}// 接收输入元素并累加到 Accumulator 数据结构public void 
>>accumulate(Top2RetractAccumulator acc, Long value) {
>>LOG.info("[INFO] accumulate ...");if 
>>(value  acc.afterFirst) {

Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?



lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
好的,我会尝试去弄一下。


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:51
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
在 2022-05-24 20:43:19,"lxk7...@163.com"  写道:
>
>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>
>这样呢
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:17
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>Hi, 你的图还是挂了,可以使用图床工具试一下
> 
> 
> 
>在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
> 
>图片好像有点问题,重新上传一下
>lxk7...@163.com
>From: Hangxiang Yu
>Date: 2022-05-24 12:09
>To: user-zh
>Subject: Re: Re: Some question with Flink state
>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>
>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>
>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>
>>
>>
>> lxk7...@163.com
>>
>> From: Hangxiang Yu
>> Date: 2022-05-23 23:09
>> To: user-zh; lxk7491
>> Subject: Re: Some question with Flink state
>> Hello,
>> All states will not be shared in different parallelisms.
>> BTW, English questions could be sent to u...@flink.apache.org.
>>
>> Best,
>> Hangxiang.
>>
>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>>
>> >
>> > Hi everyone
>> >I was used Flink keyed-state in my Project.But I found some questions
>> > that make me confused.
>> >when I used value-state in multi parallelism  the value is not I
>> wanted.
>> >So I guess that value-state is in every parallelism. every parallelism
>> > saved their only value  which means the value is Thread-Level
>> >But when I used map-state,the value is correctly. I mean the map-state
>> > was shared by every parallelism.
>> >   looking forward to your reply
>> >
>> >
>> > lxk7...@163.com
>> >
>>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png

这样呢


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com
[URL=https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png][IMG]https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png[/IMG][/URL]
[URL=https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png][IMG]https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png[/IMG][/URL]
看下这个是否能看见图片


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 20:17
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
Hi, 你的图还是挂了,可以使用图床工具试一下
 
 
 
在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:
 
图片好像有点问题,重新上传一下
lxk7...@163.com
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>


Re:Re: Re: Some question with Flink state

2022-05-24 文章 Xuyang
Hi, 你的图还是挂了,可以使用图床工具试一下



在 2022-05-24 13:50:34,"lxk7...@163.com"  写道:

图片好像有点问题,重新上传一下
lxk7...@163.com
 
From: Hangxiang Yu
Date: 2022-05-24 12:09
To: user-zh
Subject: Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
 
On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com  wrote:
 
> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>
> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>
> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>
>
>
> lxk7...@163.com
>
> From: Hangxiang Yu
> Date: 2022-05-23 23:09
> To: user-zh; lxk7491
> Subject: Re: Some question with Flink state
> Hello,
> All states will not be shared in different parallelisms.
> BTW, English questions could be sent to u...@flink.apache.org.
>
> Best,
> Hangxiang.
>
> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com  wrote:
>
> >
> > Hi everyone
> >I was used Flink keyed-state in my Project.But I found some questions
> > that make me confused.
> >when I used value-state in multi parallelism  the value is not I
> wanted.
> >So I guess that value-state is in every parallelism. every parallelism
> > saved their only value  which means the value is Thread-Level
> >But when I used map-state,the value is correctly. I mean the map-state
> > was shared by every parallelism.
> >   looking forward to your reply
> >
> >
> > lxk7...@163.com
> >
>

GlobalCommitter in Flink's two-phase commit

2022-05-24 文章 di wu
Hello
Regarding the GlobalCommitter in Flink's two-phase commit,
I see it was introduced in FLIP-143, but it seems to have been removed again in 
FLP-191 and marked as Deprecated in the source code.
I haven't found any relevant information about the use of GlobalCommitter.


There are two questions I would like to ask:
1. What are the general usage scenarios of GlobalCommitter?
2. Why should GlobalCommitter be removed in the new version of the api?


Thanks  Regards,


di.wu