wangbaohua created FLINK-25103: ---------------------------------- Summary: KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, could you tell me how to store in the six tasks A Key: FLINK-25103 URL: https://issues.apache.org/jira/browse/FLINK-25103 Project: Flink Issue Type: Bug Components: Runtime / Task Affects Versions: 1.14.0 Reporter: wangbaohua
KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, excuse me how A stored in the six tasks. When I was running, I observed that some tasks fetched variable A was null, while others had values .The following code : .... setParallelism(9); ...... public class dealStreamProcessFunction extends KeyedBroadcastProcessFunction<String, StandardEvent, List<String>, StandardEvent> { private static final Logger logger = LoggerFactory.getLogger(dealStreamProcessFunction.class); private transient ValueState<List<StandardEvent>> listState; private transient ValueState<Boolean> runingFlagState; private transient ValueState<InferenceEngine> engineState; MapStateDescriptor<String, List<String>> ruleStateDescriptor = new MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE , BasicTypeInfo.STRING_TYPE_INFO , new ListTypeInfo<>(String.class)); InferenceEngine engine; /** * open方法只会执行一次 * 可以在这实现初始化的功能 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor = new ValueStateDescriptor<List<StandardEvent>>( "recent-operator", TypeInformation.of(new TypeHint<List<StandardEvent>>() { })); ValueStateDescriptor<Boolean> runingFlagDescriptor = new ValueStateDescriptor<Boolean>( "runingFlag", Boolean.class); ValueStateDescriptor<InferenceEngine> engineDescriptor = new ValueStateDescriptor<InferenceEngine>( "runingFlag1", InferenceEngine.class); engineState = getRuntimeContext().getState(engineDescriptor); listState = getRuntimeContext().getState(recentOperatorsDescriptor); runingFlagState = getRuntimeContext().getState(runingFlagDescriptor); logger.info("KeyedBroadcastProcessFunction open"); } @Override public void processElement(StandardEvent standardEvent, ReadOnlyContext readOnlyContext, Collector<StandardEvent> collector) throws Exception { if(standardEvent == null){ return; } List<String> list = null; list = readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE); if (list == null) { logger.info("RulesBroadcastState is null.............."); List<StandardEvent> lst = listState.value(); if (lst == null) { lst = new ArrayList<>(); } lst.add(standardEvent); listState.update(lst); return; } //第一次进来 if (runingFlagState.value() == null) { logger.info("runingFlagState.value() == null"); runingFlagState.update(true); } if (((runingFlagState.value() && list.get(0).equals("1")) || list.get(0).equals("0"))) { logger.info("action update.....:" + list.size() + ":" + runingFlagState.value() + ":" + list.get(0)); String flag = list.get(0); list.remove(0); InferenceEngine engine1 = InferenceEngine.compile(RuleReader.parseRules(list)); engineState.update(engine1); if (runingFlagState.value() && flag.equals("1")) { runingFlagState.update(false); } } if (engineState.value() != null) { List<StandardEvent> listTmp = listState.value(); if (listTmp != null) { for (StandardEvent standardEventTmp : listTmp) { logger.info("listState.....:" + standardEventTmp); match(standardEventTmp, collector); } listState.clear(); } match(standardEvent, collector); } else { logger.info("processElement engine is null.....:"); } } private void match(StandardEvent standardEvent, Collector<StandardEvent> collector) throws IOException { PatternMatcher matcher = engineState.value().matcher(standardEvent); if (matcher.find()) { List<Action> actions = matcher.getActions(); for (Action action : actions) { if (standardEvent != null) { if(collector != null) collector.collect(standardEvent); else logger.info("collector is null:" ); } } } else { logger.info("no matcher:" + standardEvent); } } @Override public void processBroadcastElement(List<String> strings, Context context, Collector<StandardEvent> collector) throws Exception { BroadcastState<String, List<String>> broadcastState = context.getBroadcastState(ruleStateDescriptor); logger.info("processBroadcastElement.....:" + strings.size()); if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) { List<String> oldList = broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE); logger.info("get State:" + oldList.size() + " replaced with State:" + strings.size()); } else { logger.info("do not find old State, put first counterState {}", strings.size()); } broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings); } } -- This message was sent by Atlassian Jira (v8.20.1#820001)