Re:Re: Re: [Internet]Re: Re: Some question with Flink state
Hi, 理论上来说这句话是不是有问题? > “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换” 因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。 其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。 所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。 -- Best! Xuyang 在 2022-05-25 13:38:52,"lxk7...@163.com" 写道: > >刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话 >"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储" >我理解 >是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。 >这样的话,大部分场景其实都适合使用map-state。 > > >lxk7...@163.com > >From: jurluo(罗凯) >Date: 2022-05-25 11:05 >To: user-zh@flink.apache.org >Subject: Re: [Internet]Re: Re: Some question with Flink state >老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key >group,然后固定的key >group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key > group里面的key都可以通过map-state的user-key去分别存储。 > >> 2022年5月25日 上午10:45,lxk7...@163.com 写道: >> >> 图片好像又挂了 我重发下 >> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准 >> >> >> >>下面是我的代码及测试结果 >> >> >> >> 一.使用int类型 >> >> >> >>public class KeyByTest { >> >> >> >> public static void main(String[] args) throws Exception { >> >> >> >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> >> >> >> env.setParallelism(10); >> >> >> >> >> >> DataStreamSource dataDataStreamSource = >> env.fromCollection(Arrays.asList(new data(1, "123", "首页"), >> >> >> >> new data(1, "123", "分类页"), >> >> >> >> new data(2, "r-123", "搜索结果页"), >> >> >> >> new data(1, "r-123", "我的页"), >> >> >> >> new data(3, "r-4567", "搜索结果页"))); >> >> >> >> >> >> >> >> >> >> >> >> SingleOutputStreamOperator map = >> dataDataStreamSource.keyBy(new MyKeySelector()) >> >> >> >> .map(new RichMapFunction() { >> >> >> >> >> >> @Override >> >> >> >> public String map(data data) throws Exception { >> >> >> >> System.out.println(data.toString() + "的subtask为:" + >> getRuntimeContext().getIndexOfThisSubtask() ); >> >> >> >> return data.toString(); >> >> >> >> } >> >> >> >> }); >> >> >> >> >> >> >> >> env.execute("test"); >> >> >> >> >> >> } >> >> >> >> } >> >> >> >> class data{ >> >> >> >> private int id; >> >> >> >> private String goods; >> >> >> >> private String pageName; >> >> >> >> >> >> public data(int id, String goods, String pageName) { >> >> >> >> this.id = id; >> >> >> >> this.goods = goods; >> >> >> >> this.pageName = pageName; >> >> >> >> } >> >> >> >> >> >> >> >> public data() { >> >> >> >> } >> >> >> >> >> >> public int getId() { >> >> >> >> return id; >> >> >> >> } >> >> >> >> >> >> public void setId(int id) { >> >> >> >> this.id = id; >> >> >> >> } >> >> >> >> >> >> public String getGoods() { >> >> >> >> return goods; >> >> >> >> } >> >> >> >> >> >> public void setGoods(String goods) { >> >> >> >> this.goods = goods;
Re: Re: [Internet]Re: Re: Some question with Flink state
刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话 "map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储" 我理解 是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。 这样的话,大部分场景其实都适合使用map-state。 lxk7...@163.com From: jurluo(罗凯) Date: 2022-05-25 11:05 To: user-zh@flink.apache.org Subject: Re: [Internet]Re: Re: Some question with Flink state 老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key group,然后固定的key group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储。 > 2022年5月25日 上午10:45,lxk7...@163.com 写道: > > 图片好像又挂了 我重发下 > hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准 > > > >下面是我的代码及测试结果 > > > > 一.使用int类型 > > > >public class KeyByTest { > > > > public static void main(String[] args) throws Exception { > > > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > > > > env.setParallelism(10); > > > > > > DataStreamSource dataDataStreamSource = > env.fromCollection(Arrays.asList(new data(1, "123", "首页"), > > > > new data(1, "123", "分类页"), > > > > new data(2, "r-123", "搜索结果页"), > > > > new data(1, "r-123", "我的页"), > > > > new data(3, "r-4567", "搜索结果页"))); > > > > > > > > > > > > SingleOutputStreamOperator map = > dataDataStreamSource.keyBy(new MyKeySelector()) > > > > .map(new RichMapFunction() { > > > > > > @Override > > > > public String map(data data) throws Exception { > > > > System.out.println(data.toString() + "的subtask为:" + > getRuntimeContext().getIndexOfThisSubtask() ); > > > > return data.toString(); > > > > } > > > > }); > > > > > > > > env.execute("test"); > > > > > > } > > > > } > > > > class data{ > > > > private int id; > > > > private String goods; > > > > private String pageName; > > > > > > public data(int id, String goods, String pageName) { > > > > this.id = id; > > > > this.goods = goods; > > > > this.pageName = pageName; > > > > } > > > > > > > > public data() { > > > > } > > > > > > public int getId() { > > > > return id; > > > > } > > > > > > public void setId(int id) { > > > > this.id = id; > > > > } > > > > > > public String getGoods() { > > > > return goods; > > > > } > > > > > > public void setGoods(String goods) { > > > > this.goods = goods; > > > > } > > > > > > public String getPageName() { > > > > return pageName; > > > > } > > > > > > public void setPageName(String pageName) { > > > > this.pageName = pageName; > > > > } > > > > > > @Override > > > > public String toString() { > > > > return "data{" + > > > > "id='" + id + '\'' + > > > > ", goods='" + goods + '\'' + > > > > ", pageName='" + pageName + '\'' + > > > > '}'; > > > > } > > > > } > > > > > > class MyKeySelector implements KeySelector{ > > > > > > @Override > > > > public Integer getKey(data data) throws Exception { > > > > return data.getId(); > > > > } > > > > } > > > > 控制台的输出如下: > https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png > > &g
Re: [Internet]Re: Re: Some question with Flink state
allelism(10); > > > > > > DataStreamSource dataDataStreamSource = > env.fromCollection(Arrays.asList(new data("1", "123", "首页"), > > > > new data("1", "123", "分类页"), > > > > new data("2", "r-123", "搜索结果页"), > > > > new data("2", "r-123", "我的页"), > > > > new data("3", "r-4567", "搜索结果页"))); > > > > > > > > > > > > SingleOutputStreamOperator map = > dataDataStreamSource.keyBy(new MyKeySelector()) > > > > .map(new RichMapFunction() { > > > > > > @Override > > > > public String map(data data) throws Exception { > > > > System.out.println(data.toString() + "的subtask为:" + > getRuntimeContext().getIndexOfThisSubtask() ); > > > > return data.toString(); > > > > } > > > > }); > > > > > > > > env.execute("test"); > > > > > > } > > > > } > > > > class data{ > > > > private String id; > > > > private String goods; > > > > private String pageName; > > > > > > public data(String id, String goods, String pageName) { > > > > this.id = id; > > > > this.goods = goods; > > > > this.pageName = pageName; > > > > } > > > > > > > > public data() { > > > > } > > > > > > public String getId() { > > > > return id; > > > > } > > > > > > public void setId(String id) { > > > > this.id = id; > > > > } > > > > > > public String getGoods() { > > > > return goods; > > > > } > > > > > > public void setGoods(String goods) { > > > > this.goods = goods; > > > > } > > > > > > public String getPageName() { > > > > return pageName; > > > > } > > > > > > public void setPageName(String pageName) { > > > > this.pageName = pageName; > > > > } > > > > > > @Override > > > > public String toString() { > > > > return "data{" + > > > > "id='" + id + '\'' + > > > > ", goods='" + goods + '\'' + > > > > ", pageName='" + pageName + '\'' + > > > > '}'; > > > > } > > > > } > > > > > > class MyKeySelector implements KeySelector{ > > > > > > @Override > > > > public String getKey(data data) throws Exception { > > > > return data.getId(); > > > > } > > > > } > > > > 最终控制台输出如下: > > > https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png > > > > > 可以看见只分了两个组,我不清楚这是否是一个bug. > > > > > > > > lxk7...@163.com > > > > > > > > From: Xuyang > > > > Date: 2022-05-24 21:35 > > > > To: user-zh > > > > Subject: Re:Re: Re: Some question with Flink state > > > > 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。 > > > > 在 2022-05-24 21:06:58,"lxk7...@163.com" 写道: > > > >> 如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗? > > > >> > > > >> > > > >> > > > >> lxk7...@163.com > > > >> > > > >> From: Xuyang > > > >> Date: 2022-05-24 20:51 > > > >> To: user-zh > > > >> Subject: Re:Re: Re: Some question with Flink state > > > >> 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下 > > > >> 在 2022-05-24 20:43:19,"lxk7...@163.com" 写道: > > > >>> > > > >>> https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png > > > >>> https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png > > > >>> > > > >>> 这样呢 > > > >>> > > > >>> > > > >>> lxk7...@163.com > > > >>> > > > >>> From: Xuyang > > > >>> Date: 2022-05-24 20:17 > > > >>> To: user-zh > > > >>> Subject: Re:Re: Re: Some question with Flink state > > > >>> Hi, 你的图还是挂了,可以使用图床工具试一下 > > > >>> > > > >>> > > > >>> > > > >>> 在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: > > > >>> > > > >>> 图片好像有点问题,重新上传一下 > > > >>> lxk7...@163.com > > > >>> From: Hangxiang Yu > > > >>> Date: 2022-05-24 12:09 > > > >>> To: user-zh > > > >>> Subject: Re: Re: Some question with Flink state > > > >>> 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key > > > >>> selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); > > > >>> 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; > > > >>> On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: > > > >>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。 > > > >>>> > > > >>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > > >>>> > > > >>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > >>>> > > > >>>> > > > >>>> > > > >>>> lxk7...@163.com > > > >>>> > > > >>>> From: Hangxiang Yu > > > >>>> Date: 2022-05-23 23:09 > > > >>>> To: user-zh; lxk7491 > > > >>>> Subject: Re: Some question with Flink state > > > >>>> Hello, > > > >>>> All states will not be shared in different parallelisms. > > > >>>> BTW, English questions could be sent to u...@flink.apache.org. > > > >>>> > > > >>>> Best, > > > >>>> Hangxiang. > > > >>>> > > > >>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > > >>>> > > > >>>>> > > > >>>>> Hi everyone > > > >>>>> I was used Flink keyed-state in my Project.But I found some questions > > > >>>>> that make me confused. > > > >>>>> when I used value-state in multi parallelism the value is not I > > > >>>> wanted. > > > >>>>> So I guess that value-state is in every parallelism. every parallelism > > > >>>>> saved their only value which means the value is Thread-Level > > > >>>>> But when I used map-state,the value is correctly. I mean the map-state > > > >>>>> was shared by every parallelism. > > > >>>>>looking forward to your reply > > > >>>>> > > > >>>>> > > > >>>>> lxk7...@163.com > > > >>>>> > > > >>>> > >
Re: Re: Some question with Flink state
;" + goods + '\'' + ", pageName='" + pageName + '\'' + '}'; } } class MyKeySelector implements KeySelector{ @Override public String getKey(data data) throws Exception { return data.getId(); } } 最终控制台输出如下: https://s2.loli.net/2022/05/25/vxKiuX9od6aOTD3.png 可以看见只分了两个组,我不清楚这是否是一个bug. lxk7...@163.com From: Xuyang Date: 2022-05-24 21:35 To: user-zh Subject: Re:Re: Re: Some question with Flink state 我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。 在 2022-05-24 21:06:58,"lxk7...@163.com" 写道: >如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗? > > > >lxk7...@163.com > >From: Xuyang >Date: 2022-05-24 20:51 >To: user-zh >Subject: Re:Re: Re: Some question with Flink state >看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下 >在 2022-05-24 20:43:19,"lxk7...@163.com" 写道: >> >>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png >>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png >> >>这样呢 >> >> >>lxk7...@163.com >> >>From: Xuyang >>Date: 2022-05-24 20:17 >>To: user-zh >>Subject: Re:Re: Re: Some question with Flink state >>Hi, 你的图还是挂了,可以使用图床工具试一下 >> >> >> >>在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: >> >>图片好像有点问题,重新上传一下 >>lxk7...@163.com >>From: Hangxiang Yu >>Date: 2022-05-24 12:09 >>To: user-zh >>Subject: Re: Re: Some question with Flink state >>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key >>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); >>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; >>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: >>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。 >>> >>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 >>> >>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 >>> >>> >>> >>> lxk7...@163.com >>> >>> From: Hangxiang Yu >>> Date: 2022-05-23 23:09 >>> To: user-zh; lxk7491 >>> Subject: Re: Some question with Flink state >>> Hello, >>> All states will not be shared in different parallelisms. >>> BTW, English questions could be sent to u...@flink.apache.org. >>> >>> Best, >>> Hangxiang. >>> >>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: >>> >>> > >>> > Hi everyone >>> > I was used Flink keyed-state in my Project.But I found some questions >>> > that make me confused. >>> > when I used value-state in multi parallelism the value is not I >>> wanted. >>> > So I guess that value-state is in every parallelism. every parallelism >>> > saved their only value which means the value is Thread-Level >>> > But when I used map-state,the value is correctly. I mean the map-state >>> > was shared by every parallelism. >>> > looking forward to your reply >>> > >>> > >>> > lxk7...@163.com >>> > >>>
Re: Re: Some question with Flink state
但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。 在 2022-05-24 21:06:58,"lxk7...@163.com" 写道: >如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗? > > > >lxk7...@163.com > >From: Xuyang >Date: 2022-05-24 20:51 >To: user-zh >Subject: Re:Re: Re: Some question with Flink state >看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下 >在 2022-05-24 20:43:19,"lxk7...@163.com" 写道: >> >>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png >>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png >> >>这样呢 >> >> >>lxk7...@163.com >> >>From: Xuyang >>Date: 2022-05-24 20:17 >>To: user-zh >>Subject: Re:Re: Re: Some question with Flink state >>Hi, 你的图还是挂了,可以使用图床工具试一下 >> >> >> >>在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: >> >>图片好像有点问题,重新上传一下 >>lxk7...@163.com >>From: Hangxiang Yu >>Date: 2022-05-24 12:09 >>To: user-zh >>Subject: Re: Re: Some question with Flink state >>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key >>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); >>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; >>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: >>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。 >>> >>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 >>> >>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 >>> >>> >>> >>> lxk7...@163.com >>> >>> From: Hangxiang Yu >>> Date: 2022-05-23 23:09 >>> To: user-zh; lxk7491 >>> Subject: Re: Some question with Flink state >>> Hello, >>> All states will not be shared in different parallelisms. >>> BTW, English questions could be sent to u...@flink.apache.org. >>> >>> Best, >>> Hangxiang. >>> >>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: >>> >>> > >>> > Hi everyone >>> >I was used Flink keyed-state in my Project.But I found some questions >>> > that make me confused. >>> >when I used value-state in multi parallelism the value is not I >>> wanted. >>> >So I guess that value-state is in every parallelism. every parallelism >>> > saved their only value which means the value is Thread-Level >>> >But when I used map-state,the value is correctly. I mean the map-state >>> > was shared by every parallelism. >>> > looking forward to your reply >>> > >>> > >>> > lxk7...@163.com >>> > >>>
Re: Re: Some question with Flink state
如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗? lxk7...@163.com From: Xuyang Date: 2022-05-24 20:51 To: user-zh Subject: Re:Re: Re: Some question with Flink state 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下 在 2022-05-24 20:43:19,"lxk7...@163.com" 写道: > >https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png >https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png > >这样呢 > > >lxk7...@163.com > >From: Xuyang >Date: 2022-05-24 20:17 >To: user-zh >Subject: Re:Re: Re: Some question with Flink state >Hi, 你的图还是挂了,可以使用图床工具试一下 > > > >在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: > >图片好像有点问题,重新上传一下 >lxk7...@163.com >From: Hangxiang Yu >Date: 2022-05-24 12:09 >To: user-zh >Subject: Re: Re: Some question with Flink state >你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key >selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); >或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; >On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: >> 好的,我看这里面邮件都是英文,所以用英文问了个问题。 >> >> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 >> >> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 >> >> >> >> lxk7...@163.com >> >> From: Hangxiang Yu >> Date: 2022-05-23 23:09 >> To: user-zh; lxk7491 >> Subject: Re: Some question with Flink state >> Hello, >> All states will not be shared in different parallelisms. >> BTW, English questions could be sent to u...@flink.apache.org. >> >> Best, >> Hangxiang. >> >> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: >> >> > >> > Hi everyone >> >I was used Flink keyed-state in my Project.But I found some questions >> > that make me confused. >> >when I used value-state in multi parallelism the value is not I >> wanted. >> >So I guess that value-state is in every parallelism. every parallelism >> > saved their only value which means the value is Thread-Level >> >But when I used map-state,the value is correctly. I mean the map-state >> > was shared by every parallelism. >> > looking forward to your reply >> > >> > >> > lxk7...@163.com >> > >>
Re: Re: Some question with Flink state
好的,我会尝试去弄一下。 lxk7...@163.com From: Xuyang Date: 2022-05-24 20:51 To: user-zh Subject: Re:Re: Re: Some question with Flink state 看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下 在 2022-05-24 20:43:19,"lxk7...@163.com" 写道: > >https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png >https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png > >这样呢 > > >lxk7...@163.com > >From: Xuyang >Date: 2022-05-24 20:17 >To: user-zh >Subject: Re:Re: Re: Some question with Flink state >Hi, 你的图还是挂了,可以使用图床工具试一下 > > > >在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: > >图片好像有点问题,重新上传一下 >lxk7...@163.com >From: Hangxiang Yu >Date: 2022-05-24 12:09 >To: user-zh >Subject: Re: Re: Some question with Flink state >你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key >selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); >或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; >On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: >> 好的,我看这里面邮件都是英文,所以用英文问了个问题。 >> >> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 >> >> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 >> >> >> >> lxk7...@163.com >> >> From: Hangxiang Yu >> Date: 2022-05-23 23:09 >> To: user-zh; lxk7491 >> Subject: Re: Some question with Flink state >> Hello, >> All states will not be shared in different parallelisms. >> BTW, English questions could be sent to u...@flink.apache.org. >> >> Best, >> Hangxiang. >> >> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: >> >> > >> > Hi everyone >> >I was used Flink keyed-state in my Project.But I found some questions >> > that make me confused. >> >when I used value-state in multi parallelism the value is not I >> wanted. >> >So I guess that value-state is in every parallelism. every parallelism >> > saved their only value which means the value is Thread-Level >> >But when I used map-state,the value is correctly. I mean the map-state >> > was shared by every parallelism. >> > looking forward to your reply >> > >> > >> > lxk7...@163.com >> > >>
Re: Re: Some question with Flink state
https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png 这样呢 lxk7...@163.com From: Xuyang Date: 2022-05-24 20:17 To: user-zh Subject: Re:Re: Re: Some question with Flink state Hi, 你的图还是挂了,可以使用图床工具试一下 在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: 图片好像有点问题,重新上传一下 lxk7...@163.com From: Hangxiang Yu Date: 2022-05-24 12:09 To: user-zh Subject: Re: Re: Some question with Flink state 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: > 好的,我看这里面邮件都是英文,所以用英文问了个问题。 > > 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > > lxk7...@163.com > > From: Hangxiang Yu > Date: 2022-05-23 23:09 > To: user-zh; lxk7491 > Subject: Re: Some question with Flink state > Hello, > All states will not be shared in different parallelisms. > BTW, English questions could be sent to u...@flink.apache.org. > > Best, > Hangxiang. > > On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > > > > Hi everyone > >I was used Flink keyed-state in my Project.But I found some questions > > that make me confused. > >when I used value-state in multi parallelism the value is not I > wanted. > >So I guess that value-state is in every parallelism. every parallelism > > saved their only value which means the value is Thread-Level > >But when I used map-state,the value is correctly. I mean the map-state > > was shared by every parallelism. > > looking forward to your reply > > > > > > lxk7...@163.com > > >
Re: Re: Some question with Flink state
[URL=https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png][IMG]https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png[/IMG][/URL] [URL=https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png][IMG]https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png[/IMG][/URL] 看下这个是否能看见图片 lxk7...@163.com From: Xuyang Date: 2022-05-24 20:17 To: user-zh Subject: Re:Re: Re: Some question with Flink state Hi, 你的图还是挂了,可以使用图床工具试一下 在 2022-05-24 13:50:34,"lxk7...@163.com" 写道: 图片好像有点问题,重新上传一下 lxk7...@163.com From: Hangxiang Yu Date: 2022-05-24 12:09 To: user-zh Subject: Re: Re: Some question with Flink state 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: > 好的,我看这里面邮件都是英文,所以用英文问了个问题。 > > 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > > lxk7...@163.com > > From: Hangxiang Yu > Date: 2022-05-23 23:09 > To: user-zh; lxk7491 > Subject: Re: Some question with Flink state > Hello, > All states will not be shared in different parallelisms. > BTW, English questions could be sent to u...@flink.apache.org. > > Best, > Hangxiang. > > On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > > > > Hi everyone > >I was used Flink keyed-state in my Project.But I found some questions > > that make me confused. > >when I used value-state in multi parallelism the value is not I > wanted. > >So I guess that value-state is in every parallelism. every parallelism > > saved their only value which means the value is Thread-Level > >But when I used map-state,the value is correctly. I mean the map-state > > was shared by every parallelism. > > looking forward to your reply > > > > > > lxk7...@163.com > > >
Re: Re: Some question with Flink state
图片好像有点问题,重新上传一下 lxk7...@163.com From: Hangxiang Yu Date: 2022-05-24 12:09 To: user-zh Subject: Re: Re: Some question with Flink state 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: > 好的,我看这里面邮件都是英文,所以用英文问了个问题。 > > 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > > lxk7...@163.com > > From: Hangxiang Yu > Date: 2022-05-23 23:09 > To: user-zh; lxk7491 > Subject: Re: Some question with Flink state > Hello, > All states will not be shared in different parallelisms. > BTW, English questions could be sent to u...@flink.apache.org. > > Best, > Hangxiang. > > On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > > > > Hi everyone > >I was used Flink keyed-state in my Project.But I found some questions > > that make me confused. > >when I used value-state in multi parallelism the value is not I > wanted. > >So I guess that value-state is in every parallelism. every parallelism > > saved their only value which means the value is Thread-Level > >But when I used map-state,the value is correctly. I mean the map-state > > was shared by every parallelism. > > looking forward to your reply > > > > > > lxk7...@163.com > > >
Re: Re: Some question with Flink state
以下是我的代码部分 这是最新的一版,根据测试的时候没有啥问题 但是之前使用value state的时候能从数据上看出不对 lxk7...@163.com From: Hangxiang Yu Date: 2022-05-24 12:09 To: user-zh Subject: Re: Re: Some question with Flink state 你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: > 好的,我看这里面邮件都是英文,所以用英文问了个问题。 > > 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > > lxk7...@163.com > > From: Hangxiang Yu > Date: 2022-05-23 23:09 > To: user-zh; lxk7491 > Subject: Re: Some question with Flink state > Hello, > All states will not be shared in different parallelisms. > BTW, English questions could be sent to u...@flink.apache.org. > > Best, > Hangxiang. > > On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > > > > Hi everyone > >I was used Flink keyed-state in my Project.But I found some questions > > that make me confused. > >when I used value-state in multi parallelism the value is not I > wanted. > >So I guess that value-state is in every parallelism. every parallelism > > saved their only value which means the value is Thread-Level > >But when I used map-state,the value is correctly. I mean the map-state > > was shared by every parallelism. > > looking forward to your reply > > > > > > lxk7...@163.com > > >
Re: Re: Some question with Flink state
你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key selector相关(你可以参照下KeySelector的comments去看是否符合它的规范); 或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑; On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com wrote: > 好的,我看这里面邮件都是英文,所以用英文问了个问题。 > > 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 > > 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 > > > > lxk7...@163.com > > From: Hangxiang Yu > Date: 2022-05-23 23:09 > To: user-zh; lxk7491 > Subject: Re: Some question with Flink state > Hello, > All states will not be shared in different parallelisms. > BTW, English questions could be sent to u...@flink.apache.org. > > Best, > Hangxiang. > > On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > > > > Hi everyone > >I was used Flink keyed-state in my Project.But I found some questions > > that make me confused. > >when I used value-state in multi parallelism the value is not I > wanted. > >So I guess that value-state is in every parallelism. every parallelism > > saved their only value which means the value is Thread-Level > >But when I used map-state,the value is correctly. I mean the map-state > > was shared by every parallelism. > > looking forward to your reply > > > > > > lxk7...@163.com > > >
Re: Re: Some question with Flink state
好的,我看这里面邮件都是英文,所以用英文问了个问题。 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。 lxk7...@163.com From: Hangxiang Yu Date: 2022-05-23 23:09 To: user-zh; lxk7491 Subject: Re: Some question with Flink state Hello, All states will not be shared in different parallelisms. BTW, English questions could be sent to u...@flink.apache.org. Best, Hangxiang. On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com wrote: > > Hi everyone >I was used Flink keyed-state in my Project.But I found some questions > that make me confused. >when I used value-state in multi parallelism the value is not I wanted. >So I guess that value-state is in every parallelism. every parallelism > saved their only value which means the value is Thread-Level >But when I used map-state,the value is correctly. I mean the map-state > was shared by every parallelism. > looking forward to your reply > > > lxk7...@163.com >