hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
   下面是我的代码及测试结果
    一.使用int类型
           public class KeyByTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);

        DataStreamSource<data> dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
                new data(1, "123", "分类页"),
                new data(2, "r-123", "搜索结果页"),
                new data(1, "r-123", "我的页"),
                new data(3, "r-4567", "搜索结果页")));




        SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new 
MyKeySelector())
                .map(new RichMapFunction<data, String>() {

                    @Override
                    public String map(data data) throws Exception {
                        System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
                        return data.toString();
                    }
                });


        env.execute("test");

    }
}
class data{
    private int id;
    private String goods;
    private String pageName;

    public data(int id, String goods, String pageName) {
        this.id = id;
        this.goods = goods;
        this.pageName = pageName;
    }


    public data() {
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getGoods() {
        return goods;
    }

    public void setGoods(String goods) {
        this.goods = goods;
    }

    public String getPageName() {
        return pageName;
    }

    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

    @Override
    public String toString() {
        return "data{" +
                "id='" + id + '\'' +
                ", goods='" + goods + '\'' +
                ", pageName='" + pageName + '\'' +
                '}';
    }
}

class MyKeySelector implements KeySelector<data,Integer>{

    @Override
    public Integer getKey(data data) throws Exception {
        return data.getId();
    }
}
控制台的输出如下:
可以看见数据根据id分组,分到了不同的subtask上。


二.使用String类型  代码如下:
public class KeyByTest {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(10);

        DataStreamSource<data> dataDataStreamSource = 
env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
                new data("1", "123", "分类页"),
                new data("2", "r-123", "搜索结果页"),
                new data("2", "r-123", "我的页"),
                new data("3", "r-4567", "搜索结果页")));




        SingleOutputStreamOperator<String> map = dataDataStreamSource.keyBy(new 
MyKeySelector())
                .map(new RichMapFunction<data, String>() {

                    @Override
                    public String map(data data) throws Exception {
                        System.out.println(data.toString() + "的subtask为:" + 
getRuntimeContext().getIndexOfThisSubtask() );
                        return data.toString();
                    }
                });


        env.execute("test");

    }
}
class data{
    private String id;
    private String goods;
    private String pageName;

    public data(String id, String goods, String pageName) {
        this.id = id;
        this.goods = goods;
        this.pageName = pageName;
    }


    public data() {
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

    public String getGoods() {
        return goods;
    }

    public void setGoods(String goods) {
        this.goods = goods;
    }

    public String getPageName() {
        return pageName;
    }

    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

    @Override
    public String toString() {
        return "data{" +
                "id='" + id + '\'' +
                ", goods='" + goods + '\'' +
                ", pageName='" + pageName + '\'' +
                '}';
    }
}

class MyKeySelector implements KeySelector<data,String>{

    @Override
    public String getKey(data data) throws Exception {
        return data.getId();
    }
}
最终控制台输出如下:


可以看见只分了两个组,我不清楚这是否是一个bug.


lxk7...@163.com
 
From: Xuyang
Date: 2022-05-24 21:35
To: user-zh
Subject: Re:Re: Re: Some question with Flink state
我不确定但大概率是两次keyby只以后面那个为准,所以可能会导致你前面的keyby其实是无用的(可以试验下)。<br/><br/>可以按你说的方式将数据中这两个key拼成一个string当作shuffle的key。
在 2022-05-24 21:06:58,"lxk7...@163.com" <lxk7...@163.com> 写道:
>如果是两次keyby的问题,我可以直接在一次keyby里将两个数据给拼接成字符串,这样的方式是跟两次keyby效果一样吗?
>
>
>
>lxk7...@163.com
> 
>From: Xuyang
>Date: 2022-05-24 20:51
>To: user-zh
>Subject: Re:Re: Re: Some question with Flink state
>看起来你keyby了两次,可以自定义一个keyselector来替代这两个。另外如果担心相同key没有被分到同一个并行度时,可以在某个并行度的算子下将数据和该subtask的并行度index打出来,debug调查下
>在 2022-05-24 20:43:19,"lxk7...@163.com" <lxk7...@163.com> 写道:
>>
>>https://s2.loli.net/2022/05/24/SgAWefJpaxtOH5l.png
>>https://s2.loli.net/2022/05/24/54dZkr19QCh3Djf.png
>>
>>这样呢
>>
>>
>>lxk7...@163.com
>> 
>>From: Xuyang
>>Date: 2022-05-24 20:17
>>To: user-zh
>>Subject: Re:Re: Re: Some question with Flink state
>>Hi, 你的图还是挂了,可以使用图床工具试一下
>> 
>> 
>> 
>>在 2022-05-24 13:50:34,"lxk7...@163.com" <lxk7...@163.com> 写道:
>> 
>>图片好像有点问题,重新上传一下
>>lxk7...@163.com
>>From: Hangxiang Yu
>>Date: 2022-05-24 12:09
>>To: user-zh
>>Subject: Re: Re: Some question with Flink state
>>你是用data stream作业吗,相同key如果分配到了不同的并行度,有可能是和你写的key
>>selector相关(你可以参照下KeySelector的comments去看是否符合它的规范);
>>或者方便的话你可以分享下你的key selector相关的逻辑和使用state的逻辑;
>>On Tue, May 24, 2022 at 9:59 AM lxk7...@163.com <lxk7...@163.com> wrote:
>>> 好的,我看这里面邮件都是英文,所以用英文问了个问题。
>>>
>>> 我再描述一下我的问题,我使用键控状态,用的value-state。按理来说,相同的key应该会被分到同一个并行度处理。但是但我使用多个并行度的时候,我发现好像相同的key并没有分配到同一个并行度处理。具体现象在于,我的程序是对同一个用户点击的商品进行累加,在数据里这个商品已经是第二个了,但是程序里这个商品的状态是空,所以导致最终累加的结果是1,而正确结果应该是2。所以我猜测是不是算子对于value-state都是独有的。
>>>
>>> 但是当我使用mapstate的时候,这个问题好像就没有再出现了。所以我想了解这里面的原因是啥?或者有什么方法能确保同一个key的数据都会被同一个task处理。
>>>
>>>
>>>
>>> lxk7...@163.com
>>>
>>> From: Hangxiang Yu
>>> Date: 2022-05-23 23:09
>>> To: user-zh; lxk7491
>>> Subject: Re: Some question with Flink state
>>> Hello,
>>> All states will not be shared in different parallelisms.
>>> BTW, English questions could be sent to u...@flink.apache.org.
>>>
>>> Best,
>>> Hangxiang.
>>>
>>> On Mon, May 23, 2022 at 4:03 PM lxk7...@163.com <lxk7...@163.com> wrote:
>>>
>>> >
>>> > Hi everyone
>>> >    I was used Flink keyed-state in my Project.But I found some questions
>>> > that make me confused.
>>> >    when I used value-state in multi parallelism  the value is not I
>>> wanted.
>>> >    So I guess that value-state is in every parallelism. every parallelism
>>> > saved their only value  which means the value is Thread-Level
>>> >    But when I used map-state,the value is correctly. I mean the map-state
>>> > was shared by every parallelism.
>>> >   looking forward to your reply
>>> >
>>> >
>>> > lxk7...@163.com
>>> >
>>>

回复