Hi,

是否发一下可复现的完整示例?

Regards,
Dian

> 2021年7月10日 下午7:44,赵飞 <emanonc...@gmail.com> 写道:
> 
> 各位好,请教一个问题。
> 
> 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
> 
> -------
> results = data.connect(rules).key_by('product_id',
> 'product_id').process(MyFunction())
> results.print()
> 
> class MyFunction(KeyedCoProcessFunction):
>    def open(self, ctx):
>        data_desc = MapStateDescriptor('data', key_type, value_type)
>        self.data = ctx.get_map_state(data_desc)
> 
>        rule_desc = MapStateDescriptor('rule', key_type,  value_type)
>        self.rules = ctx.get_map_state(rule_desc)
> 
>    def process_element1(self, data_value, ctx):
>        row_id, others = data_value[0], data_value[1:]
>        self.data.put(row_id, others)
>        result = []
>        for key, value_list in self.rules.items():
>            product_id, random_0, random_1  = value_list
>            # Do some calculations
>            product_ids_of_state_data = OtherFunction(self.data)
>            result.append(random_0, random_1, product_id,
> product_ids_of_state_data)
>        return result
> 
>    def process_element2(self, rule_value, ctx):
>        row_id, others = rule_value[0], rule_value[1:]
>        self.rules.put(row_id, others)
> ------
> 
> 数据格式大致如下:
> # 数据流(假设第二个元素为产品id)
> [
>    ['row_0', 1, 'a_0', 2],
>    ['row_1', 2, 'a_1', 3],
>    ['row_2', 1, 'a_2', 4],
>    ['row_4', 2, 'a_3', 5]
> ]
> 
> # 规则流(假设第二个元素为产品id)
> [
>    ['row_0', 1, 'rule1_value0', 'rule1_value1'],
>    ['row_1', 2, 'rule2_value0', 'rule2_value1']
> ]
> 
> 执行程序(指定全局并行度为1)后,得到的输出类似于:
> ['rule1_value0',  'rule1_value1', 1, [1, 2]]
> ['rule2_value0',  'rule2_value1', 2, [1, 2]]
> 从输出来看,当某产品的数据进来时,只使用了其对应的规则进行了处理,可以表明规则确实按产品id分区了,但是维护数据的MapState中却包含了多个产品id的数据。
> 
> 更进一步的现象为:
> 0. 如果数据流中的数据一直按照先产品1,后产品2的顺序,那么能够正常分区。但是如果无法保证这个顺序,则会出现以上描述的问题
> 1. 一旦数据流中的元素数量超过50,那么便会出现以上现象(上述所说的“规则按照产品id正确分区”也有可能只是规则流的数量未超过上限)
> 
> 按照官网的描述,某个键值的数据应只能访问到属于该键值的状态,在这个例子中,我的理解是维护数据的MapState中应该有且仅包含一个产品的数据。请问是我理解有误?还是这确实是个问题?

回复