Hi, 可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分
Best, Weihua On Tue, Feb 21, 2023 at 9:38 AM 知而不惑 <chenliangv...@qq.com.invalid> wrote: > 各位大佬好 > 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction > 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt > 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错 > 以下是processElement 的最小工作单元代码示例 和 main 方法的使用: > @Override > public void processElement(FileEventOuterClass.FileEvent value, > BroadcastProcessFunction<FileEventOuterClass.FileEvent, > List<SensitiveRule>, FileEventOuterClass.FileEvent>.ReadOnlyContext > ctx, Collector<FileEventOuterClass.FileEvent> out) { > try { > ReadOnlyBroadcastState<Void, List<SensitiveRule>> > broadcastState = ctx.getBroadcastState(ruleDescriptor); > > List<SensitiveRule> sensitiveRules = broadcastState.get(null); > if (CollectionUtils.isEmpty(sensitiveRules)) { > return; > } > .... > } catch (Exception e) { > log.error("SensitiveDataClassify err:", e); > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.setParallelism(1); > > MapStateDescriptor<Void, List<SensitiveRule>> ruleDescriptor = > new MapStateDescriptor<>("ruleBroadcastState", Types.VOID, > new ListTypeInfo<>(SensitiveRule.class)); > > // 广播流 > BroadcastStream<List<SensitiveRule>> broadcast = > sensitiveRule.broadcast(ruleDescriptor); > > DataStreamSource<String> localhost = > env.socketTextStream("localhost", 11451); > SingleOutputStreamOperator<FileEventOuterClass.FileEvent> stream = > localhost.map((MapFunction<String, FileEventOuterClass.FileEvent>) value > -> > FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build()); > > SingleOutputStreamOperator<FileEventOuterClass.FileEvent> > streamOperator = stream.connect(broadcast).process(new > SensitiveDataClassify()); > streamOperator.print("qqq"); > env.execute(); > > }