那我理解了,描述的太清晰了,我确实之前没理解state的生命周期和使用方法,十分感谢

在 2022-08-29 18:42:51,"Zhiwen Sun" <pens...@gmail.com> 写道:
>你应该没有正确理解 state 的使用
>
>我们一般在程序里面是用的是 KeyedState , 也就是和 key 伴随的。
>
>基于上面,所以 open() 里面只能对 state 进行初始化, 但是没有办法设置 state 的 value,因为这时候没有 key ;
>另外一方面,也不会在 map() 的时候去 new state (可以认为 state 是一个大的 Map,你 map 的时候只是操作其中的一个
>key)。
>
>回到你的需求,你应该在 open() 的时候保存相关信息到类变量里面,当 map() 的时候再去 update state。
>
>
>
>
>
>
>Zhiwen Sun
>
>
>
>On Fri, Aug 26, 2022 at 1:55 PM 曲洋 <quyanghao...@126.com> wrote:
>
>>
>> 对的,是后者,statAccumulator.value()是null,就是map方法中取值就成null了,但是open中命名初始化了,这个是因为map太快了吗,state没初始化完就开始拿了吗.
>> 嗯嗯,我现在改成先进行判断
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> 在 2022-08-26 11:22:43,"Hangxiang Yu" <master...@gmail.com> 写道:
>> >open确实是初始化的时候就会调用的;
>>
>> >第一次调用是null是说statAccumulator是null还是statAccumulator.value()是null,后者的话是正常可能会出现的;
>> >这里的写法看起来有点问题,一般用value方法取出来可以先判断下,然后对value state的更新用update方法;
>> >
>> >On Fri, Aug 26, 2022 at 10:25 AM 曲洋 <quyanghao...@126.com> wrote:
>> >
>> >> 各位好,
>> >>
>> >>
>> 我想请教一个问题,我的Flink应用中会在state里边存储一些指标,比如一年的总数,然后当任务各种原因断掉的时候,我希望可以通过入参的方式直接调节这个state,但是遇到了一个问题,如下:
>> >> 我重写了RichMapFunction,yearTotal
>> >>
>> 这个指标是通过命令行传进来的,然后我希望初始化到state里边,但是我发现,open方法第一次调用的时候state都是null,然后这个参数就进不来
>> >> 所以我想问下这个场景怎么办,还有open方法的生命周期是怎么样的,我本以为是map第一次打开的时候就会调用,结果好像不是
>> >> public static class AccumulateAmounts extends RichMapFunction<v2bean,
>> >> BlueAccumulaterInitState> {
>> >>         private transient ValueState<BlueAccumulaterInitState>
>> >> statAccumulator;
>> >>
>> >>
>> >>         @Override
>> >>         public BlueAccumulaterInitState map(v2bean currentAccumulator)
>> >> throws Exception {
>> >>
>> >>
>> >>             BlueAccumulaterInitState stat = (statAccumulator.value() !=
>> >> null) ? statAccumulator.value() : new BlueAccumulaterInitState();
>> >>             Long yearIncrement = year.equals(stat.getYear()) ?
>> >> stat.getYearMetric() + 1L : 1L;
>> >>             stat.setYearMetric(yearIncrement);
>> >>
>> >>
>> >>             statAccumulator.update(stat);
>> >>             return stat;
>> >>         }
>> >>
>> >>
>> >>         @Override
>> >>         public void open(Configuration config) {
>> >>             ValueStateDescriptor<BlueAccumulaterInitState> descriptor =
>> >>                     new ValueStateDescriptor<>(
>> >>                             "total",
>> >>                             TypeInformation.of(new
>> >> TypeHint<BlueAccumulaterInitState>() {
>> >>                             }));
>> >>             statAccumulator = getRuntimeContext().getState(descriptor);
>> >>             ExecutionConfig.GlobalJobParameters globalParams =
>> >> getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
>> >>             Configuration globConf = (Configuration) globalParams;
>> >>             long yearTotal =
>> >> globConf.getLong(ConfigOptions.key("year").longType().noDefaultValue());
>> >> statAccumulator.value().setYearMetric(yearTotal);
>> >>
>> >>
>> >>
>> >>         }
>> >>     }
>> >
>> >
>> >
>> >--
>> >Best,
>> >Hangxiang.
>>

Reply via email to