Re:Re: Re: [Internet]Re: Re: Some question with Flink state

2022-06-02 文章 Xuyang
Hi, 理论上来说这句话是不是有问题?


> “是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换”


因为ValueState也是keyedState的一种,所以也是每个key各自维护一个valuestate,不同的key之间是隔离的。
其实一般情况下ValueState里面存Map,和直接MapState没啥区别,只不过在不同的状态存储上和状态的TTL策略有略微不同,所以不太推荐ValueState里面存Map。
所以其实还是看具体的业务场景,假如只是算一个累加的值的话,用valuestate就够了。




--

Best!
Xuyang





在 2022-05-25 13:38:52,"lxk7...@163.com"  写道:
>
>刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
>"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
>我理解   
>是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
>这样的话,大部分场景其实都适合使用map-state。
>
>
>lxk7...@163.com
> 
>From: jurluo(罗凯)
>Date: 2022-05-25 11:05
>To: user-zh@flink.apache.org
>Subject: Re: [Internet]Re: Re: Some question with Flink state
>老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
>group,然后固定的key 
>group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
> group里面的key都可以通过map-state的user-key去分别存储。
> 
>> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
>> 
>> 图片好像又挂了  我重发下
>> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
>> 
>> 
>> 
>>下面是我的代码及测试结果
>> 
>> 
>> 
>> 一.使用int类型
>> 
>> 
>> 
>>public class KeyByTest {
>> 
>> 
>> 
>> public static void main(String[] args) throws Exception {
>> 
>> 
>> 
>> StreamExecutionEnvironment env = 
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> 
>> 
>> 
>> env.setParallelism(10);
>> 
>> 
>> 
>> 
>> 
>> DataStreamSource dataDataStreamSource = 
>> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
>> 
>> 
>> 
>> new data(1, "123", "分类页"),
>> 
>> 
>> 
>> new data(2, "r-123", "搜索结果页"),
>> 
>> 
>> 
>> new data(1, "r-123", "我的页"),
>> 
>> 
>> 
>> new data(3, "r-4567", "搜索结果页")));
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> SingleOutputStreamOperator map = 
>> dataDataStreamSource.keyBy(new MyKeySelector())
>> 
>> 
>> 
>> .map(new RichMapFunction() {
>> 
>> 
>> 
>> 
>> 
>> @Override
>> 
>> 
>> 
>> public String map(data data) throws Exception {
>> 
>> 
>> 
>> System.out.println(data.toString() + "的subtask为:" + 
>> getRuntimeContext().getIndexOfThisSubtask() );
>> 
>> 
>> 
>> return data.toString();
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> });
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> env.execute("test");
>> 
>> 
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> class data{
>> 
>> 
>> 
>> private int id;
>> 
>> 
>> 
>> private String goods;
>> 
>> 
>> 
>> private String pageName;
>> 
>> 
>> 
>> 
>> 
>> public data(int id, String goods, String pageName) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> this.goods = goods;
>> 
>> 
>> 
>> this.pageName = pageName;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> 
>> 
>> public data() {
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public int getId() {
>> 
>> 
>> 
>> return id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setId(int id) {
>> 
>> 
>> 
>> this.id = id;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public String getGoods() {
>> 
>> 
>> 
>> return goods;
>> 
>> 
>> 
>> }
>> 
>> 
>> 
>> 
>> 
>> public void setGoods(String goods) {
>> 
>> 
>> 
>> this.goods = goods;

Re: Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 lxk7...@163.com

刚看了下keygroup的原理,前面的内容大致能理解了,对于下面这段话
"map-state的话相当于某些固定的key group里面的key都可以通过map-state的user-key去分别存储"
我理解   
是因为如果使用value-state,一个task会存在多个key,不同的key的内容会进行替换,而使用map的话,就算同一个task有多个key,根据用户自定义的key还是可以匹配到的。
这样的话,大部分场景其实都适合使用map-state。


lxk7...@163.com
 
From: jurluo(罗凯)
Date: 2022-05-25 11:05
To: user-zh@flink.apache.org
Subject: Re: [Internet]Re: Re: Some question with Flink state
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。
 
> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
&g

Re: [Internet]Re: Re: Some question with Flink state

2022-05-24 文章 罗凯
老哥,看起来好像没什么问题,相同的key都分配在了同个task,每个task会存在多种key是正常的。key会按最大并行度分成多个key 
group,然后固定的key 
group分配到各个task里。只能保证相同的key会到同一个task,不能保证一个task只有一个key。你这个需求用map-state才合适。map-state的话相当于某些固定的key
 group里面的key都可以通过map-state的user-key去分别存储。

> 2022年5月25日 上午10:45,lxk7...@163.com 写道:
> 
> 图片好像又挂了  我重发下
> hello,我这边测试了一下发现一个问题,在使用String类型做keyby的时候并没有得到正确的结果,而使用int类型的时候结果是正确。而且测试发现两次keyby确实是以第二次keyby为准
> 
> 
> 
>下面是我的代码及测试结果
> 
> 
> 
> 一.使用int类型
> 
> 
> 
>public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data(1, "123", "首页"),
> 
> 
> 
> new data(1, "123", "分类页"),
> 
> 
> 
> new data(2, "r-123", "搜索结果页"),
> 
> 
> 
> new data(1, "r-123", "我的页"),
> 
> 
> 
> new data(3, "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private int id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(int id, String goods, String pageName) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> 
> 
> public data() {
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public int getId() {
> 
> 
> 
> return id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setId(int id) {
> 
> 
> 
> this.id = id;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getGoods() {
> 
> 
> 
> return goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setGoods(String goods) {
> 
> 
> 
> this.goods = goods;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public String getPageName() {
> 
> 
> 
> return pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> public void setPageName(String pageName) {
> 
> 
> 
> this.pageName = pageName;
> 
> 
> 
> }
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String toString() {
> 
> 
> 
> return "data{" +
> 
> 
> 
> "id='" + id + '\'' +
> 
> 
> 
> ", goods='" + goods + '\'' +
> 
> 
> 
> ", pageName='" + pageName + '\'' +
> 
> 
> 
> '}';
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 
> 
> class MyKeySelector implements KeySelector{
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public Integer getKey(data data) throws Exception {
> 
> 
> 
> return data.getId();
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> 控制台的输出如下:
> https://s2.loli.net/2022/05/25/mxtZu9YAPN2FD1a.png
> 
> 
> 
> 可以看见数据根据id分组,分到了不同的subtask上。
> 
> 
> 
> 
> 
> 
> 
> 二.使用String类型  代码如下:
> 
> 
> 
> public class KeyByTest {
> 
> 
> 
> public static void main(String[] args) throws Exception {
> 
> 
> 
> StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
> 
> 
> 
> env.setParallelism(10);
> 
> 
> 
> 
> 
> DataStreamSource dataDataStreamSource = 
> env.fromCollection(Arrays.asList(new data("1", "123", "首页"),
> 
> 
> 
> new data("1", "123", "分类页"),
> 
> 
> 
> new data("2", "r-123", "搜索结果页"),
> 
> 
> 
> new data("2", "r-123", "我的页"),
> 
> 
> 
> new data("3", "r-4567", "搜索结果页")));
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> SingleOutputStreamOperator map = 
> dataDataStreamSource.keyBy(new MyKeySelector())
> 
> 
> 
> .map(new RichMapFunction() {
> 
> 
> 
> 
> 
> @Override
> 
> 
> 
> public String map(data data) throws Exception {
> 
> 
> 
> System.out.println(data.toString() + "的subtask为:" + 
> getRuntimeContext().getIndexOfThisSubtask() );
> 
> 
> 
> return data.toString();
> 
> 
> 
> }
> 
> 
> 
> });
> 
> 
> 
> 
> 
> 
> 
> env.execute("test");
> 
> 
> 
> 
> 
> }
> 
> 
> 
> }
> 
> 
> 
> class data{
> 
> 
> 
> private String id;
> 
> 
> 
> private String goods;
> 
> 
> 
> private String pageName;
> 
> 
> 
> 
> 
> public data(String id, String goods, String pageName) {
> 
> 
> 
> this.id =