Hi,
我在u...@flink.apache.org列表中也发送了这个问题,已经得到回复说问题的原因如下:
The root cause is the wrong mapping of the state key to the state. This
> kind of wrong mapping occurs when the key is switched, but the state is not
> used. As you wrote in the example, the `data` you declared is not used in
>
Hi,
刚才直接在Gmail回复了您的邮件,但是在Pony Mail这边似乎看不到,所以我再贴一遍。
以下是完整代码。需要补充的是,相同条件下运行相同代码,结果可能会不一样,有时候正常,有时候能够复现问题。
--
import random
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedCoProcessFunction
from
Hi,
以下是完整代码。需要补充的是,相同条件下运行相同代码,结果可能会不一样,有时候正常,有时候能够复现问题。
import random
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import KeyedCoProcessFunction
from pyflink.datastream.state import MapStateDescriptor
from
Hi,
是否发一下可复现的完整示例?
Regards,
Dian
> 2021年7月10日 下午7:44,赵飞 写道:
>
> 各位好,请教一个问题。
>
> 最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
>
> ---
> results = data.connect(rules).key_by('product_id',
> 'product_id').process(MyFunction())
各位好,请教一个问题。
最近我在使用pyflink开发一个模块,主要的功能是基于规则对用户数据进行计算和判断。涉及到两个流:数据流(data)和规则流(rule),两者都包含一个产品id值,所以将该值作为key来分区,处理的代码大致如下:
---
results = data.connect(rules).key_by('product_id',
'product_id').process(MyFunction())
results.print()
class MyFunction(KeyedCoProcessFunction):
def open(self,