wangbaohua created FLINK-25103:
----------------------------------

             Summary: KeyedBroadcastProcessFunction run set 6, parallelism 
ValueState variables A, could you tell me how to store in the six tasks A  
                 Key: FLINK-25103
                 URL: https://issues.apache.org/jira/browse/FLINK-25103
             Project: Flink
          Issue Type: Bug
          Components: Runtime / Task
    Affects Versions: 1.14.0
            Reporter: wangbaohua


KeyedBroadcastProcessFunction run set 6, parallelism ValueState variables A, 
excuse me how A stored in the six tasks.  When I was running, I observed that 
some tasks fetched variable A was null, while others had values  .The following 
code  :
....
setParallelism(9);
......
public class dealStreamProcessFunction extends 
KeyedBroadcastProcessFunction<String, StandardEvent, List<String>, 
StandardEvent> {
    private static final Logger logger = 
LoggerFactory.getLogger(dealStreamProcessFunction.class);

    private transient ValueState<List<StandardEvent>> listState;
    private transient ValueState<Boolean> runingFlagState;
    private transient ValueState<InferenceEngine> engineState;
    MapStateDescriptor<String, List<String>> ruleStateDescriptor = new 
MapStateDescriptor<>(ContextInfo.RULE_SBROAD_CAST_STATE
            , BasicTypeInfo.STRING_TYPE_INFO
            , new ListTypeInfo<>(String.class));
    InferenceEngine engine;

    /**
     * open方法只会执行一次
     * 可以在这实现初始化的功能
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ValueStateDescriptor<List<StandardEvent>> recentOperatorsDescriptor = 
new ValueStateDescriptor<List<StandardEvent>>(
                "recent-operator",
                TypeInformation.of(new TypeHint<List<StandardEvent>>() {
                }));

        ValueStateDescriptor<Boolean> runingFlagDescriptor = new 
ValueStateDescriptor<Boolean>(
                "runingFlag",
                Boolean.class);

        ValueStateDescriptor<InferenceEngine> engineDescriptor = new 
ValueStateDescriptor<InferenceEngine>(
                "runingFlag1",
                InferenceEngine.class);
        engineState = getRuntimeContext().getState(engineDescriptor);
        listState = getRuntimeContext().getState(recentOperatorsDescriptor);
        runingFlagState = getRuntimeContext().getState(runingFlagDescriptor);

        logger.info("KeyedBroadcastProcessFunction open");
    }

    @Override
    public void processElement(StandardEvent standardEvent, ReadOnlyContext 
readOnlyContext, Collector<StandardEvent> collector) throws Exception {
        if(standardEvent == null){
            return;
        }
        List<String> list = null;
        list = 
readOnlyContext.getBroadcastState(ruleStateDescriptor).get(ContextInfo.RULE_SBROAD_CAST_STATE);
        if (list == null) {
            logger.info("RulesBroadcastState is null..............");
            List<StandardEvent> lst = listState.value();
            if (lst == null) {
                lst = new ArrayList<>();
            }
            lst.add(standardEvent);
            listState.update(lst);
            return;
        }
        //第一次进来
        if (runingFlagState.value() == null) {
            logger.info("runingFlagState.value() == null");
            runingFlagState.update(true);
        }
        if (((runingFlagState.value() && list.get(0).equals("1")) || 
list.get(0).equals("0"))) {
            logger.info("action update.....:" + list.size() + ":" + 
runingFlagState.value() + ":" + list.get(0));
            String flag = list.get(0);
            list.remove(0);
            InferenceEngine engine1 = 
InferenceEngine.compile(RuleReader.parseRules(list));
            engineState.update(engine1);
            if (runingFlagState.value() && flag.equals("1")) {
                runingFlagState.update(false);
            }
        }

        if (engineState.value() != null) {
            List<StandardEvent> listTmp = listState.value();
            if (listTmp != null) {
                for (StandardEvent standardEventTmp : listTmp) {
                    logger.info("listState.....:" + standardEventTmp);
                    match(standardEventTmp, collector);
                }
                listState.clear();
            }
            match(standardEvent, collector);
        } else {
            logger.info("processElement engine is null.....:");

        }

    }

    private void match(StandardEvent standardEvent, Collector<StandardEvent> 
collector) throws IOException {
        PatternMatcher matcher = engineState.value().matcher(standardEvent);
        if (matcher.find()) {
            List<Action> actions = matcher.getActions();
            for (Action action : actions) {
                if (standardEvent != null) {
                    if(collector != null)
                        collector.collect(standardEvent);
                    else
                        logger.info("collector is null:" );
                }
            }
        } else {
            logger.info("no matcher:" + standardEvent);
        }
    }

    @Override
    public void processBroadcastElement(List<String> strings, Context context, 
Collector<StandardEvent> collector) throws Exception {
        BroadcastState<String, List<String>> broadcastState = 
context.getBroadcastState(ruleStateDescriptor);
        logger.info("processBroadcastElement.....:" + strings.size());
        if (broadcastState.contains(ContextInfo.RULE_SBROAD_CAST_STATE)) {
            List<String> oldList = 
broadcastState.get(ContextInfo.RULE_SBROAD_CAST_STATE);
            logger.info("get State:" + oldList.size() + "  replaced with 
State:" + strings.size());
        } else {
            logger.info("do not find old State, put first counterState {}", 
strings.size());
        }
        broadcastState.put(ContextInfo.RULE_SBROAD_CAST_STATE, strings);
    }
}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to