Hi,

可以把具体的报错信息贴一下,另外代码中没有看到使用 listState 缓存元素的部分

Best,
Weihua


On Tue, Feb 21, 2023 at 9:38 AM 知而不惑 <chenliangv...@qq.com.invalid> wrote:

> 各位大佬好
> 我使用广播流与非广播流进行connet,随后继承实现了BroadcastProcessFunction,运行时发现在自定义实现的BroadcastProcessFunction
> 中,广播流数据会先到,导致processElement() 中获取广播流数据为空,请问有什么写法或机制解决该问题?我尝试在谷歌和chatgpt
> 找寻答案,给到的回复是 用list 缓存元素 ,在open中初始化,但是我在open中初始化得到了一个 必须在keyby() 之后使用的报错
> 以下是processElement 的最小工作单元代码示例 和 main 方法的使用:
> @Override
> public void processElement(FileEventOuterClass.FileEvent value,
> BroadcastProcessFunction<FileEventOuterClass.FileEvent,
> List<SensitiveRule&gt;, FileEventOuterClass.FileEvent&gt;.ReadOnlyContext
> ctx, Collector<FileEventOuterClass.FileEvent&gt; out) {
>     try {
>         ReadOnlyBroadcastState<Void, List<SensitiveRule&gt;&gt;
> broadcastState = ctx.getBroadcastState(ruleDescriptor);
>
>         List<SensitiveRule&gt; sensitiveRules = broadcastState.get(null);
>         if (CollectionUtils.isEmpty(sensitiveRules)) {
>             return;
>         }
>         ....
>     } catch (Exception e) {
>         log.error("SensitiveDataClassify err:", e);
>     }
> }
> public static void main(String[] args) throws Exception {
>     StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
>     env.setParallelism(1);
>
>     MapStateDescriptor<Void, List<SensitiveRule&gt;&gt; ruleDescriptor =
>             new MapStateDescriptor<&gt;("ruleBroadcastState", Types.VOID,
> new ListTypeInfo<&gt;(SensitiveRule.class));
>
>     // 广播流
>     BroadcastStream<List<SensitiveRule&gt;&gt; broadcast =
> sensitiveRule.broadcast(ruleDescriptor);
>
>     DataStreamSource<String&gt; localhost =
> env.socketTextStream("localhost", 11451);
>     SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt; stream =
> localhost.map((MapFunction<String, FileEventOuterClass.FileEvent&gt;) value
> -&gt;
> FileEventOuterClass.FileEvent.newBuilder().setChannel("aaaa").build());
>
>     SingleOutputStreamOperator<FileEventOuterClass.FileEvent&gt;
> streamOperator = stream.connect(broadcast).process(new
> SensitiveDataClassify());
>     streamOperator.print("qqq");
>     env.execute();
>
> }

回复