附件是代码
还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗
-- 代码是读出所有map状态的key。













在 2020-04-30 09:40:45,"shx" <17611022...@163.com> 写道:
>能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢
>
>
>
>
>| |
>邵红晓
>|
>|
>邮箱:17611022...@163.com
>|
>
>签名由 网易邮箱大师 定制
>
>在2020年04月30日 09:04,guanyq 写道:
>代码中没特别指定Serializer。都是默认的序列化。
>在 2020-04-29 18:20:22,"Congxian Qiu" <qcx978132...@gmail.com> 写道:
>>Hi
>>从错误日志看,是 StateMigration 相关的问题。
>>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer
>>是一样的或者是兼容的,你可以参考下这个文档[1]
>>
>>[1]
>>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html
>>
>>Best,
>>Congxian
>>
>>
>>guanyq <dlgua...@163.com> 于2020年4月29日周三 下午6:09写道:
>>
>>>
>>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
package com.data.processing.entity;

/**
 * 新增
 *
 * @author guanyq
 * @date 2020/4/5
 */
public class OnlineOrderUncompleted {
    private String provinceCode;
    private String inModeCode;
    private String acceptType;
    private String siteSelectionType;
    private String resPrejudgeFlag;
    private Long step;
    private String orderId;
    private String receiveDate;

    public OnlineOrderUncompleted() {}

    public OnlineOrderUncompleted(String provinceCode, String inModeCode, 
String acceptType, String siteSelectionType, String resPrejudgeFlag, Long step, 
String orderId, String receiveDate) {
        this.provinceCode = provinceCode;
        this.inModeCode = inModeCode;
        this.acceptType = acceptType;
        this.siteSelectionType = siteSelectionType;
        this.resPrejudgeFlag = resPrejudgeFlag;
        this.step = step;
        this.orderId = orderId;
        this.receiveDate = receiveDate;
    }

    @Override
    public String toString() {
        return "OnlineOrderUncompleted{" +
                "provinceCode='" + provinceCode + '\'' +
                ", inModeCode='" + inModeCode + '\'' +
                ", acceptType='" + acceptType + '\'' +
                ", siteSelectionType='" + siteSelectionType + '\'' +
                ", resPrejudgeFlag='" + resPrejudgeFlag + '\'' +
                ", step='" + step + '\'' +
                ", orderId='" + orderId + '\'' +
                ", receiveDate='" + receiveDate + '\'' +
                '}';
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public String getProvinceCode() {
        return provinceCode;
    }

    public void setProvinceCode(String provinceCode) {
        this.provinceCode = provinceCode;
    }

    public String getInModeCode() {
        return inModeCode;
    }

    public void setInModeCode(String inModeCode) {
        this.inModeCode = inModeCode;
    }

    public String getAcceptType() {
        return acceptType;
    }

    public void setAcceptType(String acceptType) {
        this.acceptType = acceptType;
    }

    public String getSiteSelectionType() {
        return siteSelectionType;
    }

    public void setSiteSelectionType(String siteSelectionType) {
        this.siteSelectionType = siteSelectionType;
    }

    public String getResPrejudgeFlag() {
        return resPrejudgeFlag;
    }

    public void setResPrejudgeFlag(String resPrejudgeFlag) {
        this.resPrejudgeFlag = resPrejudgeFlag;
    }

    public String getReceiveDate() {
        return receiveDate;
    }

    public void setReceiveDate(String receiveDate) {
        this.receiveDate = receiveDate;
    }

    public Long getStep() {
        return step;
    }

    public void setStep(Long step) {
        this.step = step;
    }
}
package com.data.processing.unconditionalacceptance;

import com.alibaba.fastjson.JSONObject;
import com.data.processing.entity.OnlineOrderUncompleted;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
import org.apache.flink.util.Collector;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;

/**
 * 正式单在途
 *
 * @author guanyq
 * @date 2020/04/04
 */

public class OnlineOrderUncompletedDataProcess {

    public static final String BROADBAND_ORDER_ACCEPT = 
"BROADBAND_ORDER_ACCEPT";
    public static final String SIMPLE_ACCEPTANCE = "E";
    public static final String SUBSCRIBE_STATE_9 = "9";
    public static final String NEXT_DEAL_TAG_0 = "0";
    public static final String SUBSCRIBE_STATE_0 = "0";
    public static final String NEXT_DEAL_TAG_T = "T";
    public static final String CONNECT_CHAR = "|";
    public static final String CONNECT_CHAR_ID = "_";
    public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS = 
"OnlineOrderUncompletedDataProcess";
    public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1 = 
"OnlineOrderUncompletedDataProcess_1";
    public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2 = 
"OnlineOrderUncompletedDataProcess_2";

    public static void main(String[] args) {
        // log
        Logger LOG = 
LoggerFactory.getLogger(OnlineOrderUncompletedDataProcess.class);
        try {
            // parameter
            final String handleDay;
            final String bootstrapServers;
            final String consumerTopic;
            final String groupId;
            final String sinkTopic;
            final String windowTime;
            final String checkpointDataUri;
            final String checkpointInterval;
            final ParameterTool params = ParameterTool.fromArgs(args);
            handleDay = params.getRequired("handle.day");
            bootstrapServers = params.getRequired("bootstrap.servers");
            consumerTopic = params.getRequired("consumer.topic");
            groupId = params.getRequired("group.id");
            sinkTopic = params.getRequired("sink.topic");
            windowTime = params.getRequired("window.time");
            checkpointDataUri = params.getRequired("checkpoint.data.uri");
            checkpointInterval = params.getRequired("checkpoint.interval");

            // get env
            final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
            
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
            env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
            env.enableCheckpointing(Long.parseLong(checkpointInterval));
            StateBackend backend = new FsStateBackend(checkpointDataUri);
            env.setStateBackend(backend);
            // set kafka topic
            List<String> topicList = new ArrayList<>();
            topicList.add(consumerTopic);
            Properties props = getProperties(bootstrapServers, groupId);
            // get kafka consumer
            FlinkKafkaConsumer<String> consumer = new 
FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props);
            long handleDayEpochMilli = getHandleDayEpochMilli(handleDay);
            consumer.setStartFromTimestamp(handleDayEpochMilli);
            // kafka source
            SingleOutputStreamOperator<String> ds = 
env.addSource(consumer).name("source").uid("source");
            // 计算正式单在途量,过滤
            SingleOutputStreamOperator<String> filter = ds.filter(new 
PrimaryFilterFunction());
            // format tuple
            SingleOutputStreamOperator<Tuple2<String, String>> primaryMap = 
filter.map(new PrimaryMapFunction());
            // format map
            SingleOutputStreamOperator<OnlineOrderUncompleted> map = primaryMap
                    .keyBy(0)
                    .map(new PrimaryRichMapFunction(handleDay))
                    .name(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1)
                    .uid(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1);
            SingleOutputStreamOperator<String> finalStream = map
                    .keyBy("provinceCode", "inModeCode", "acceptType", 
"siteSelectionType", "resPrejudgeFlag")
                    .timeWindow(Time.seconds(Long.parseLong(windowTime)))
                    .apply(new FinalRichFunction())
                    .name(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2)
                    .uid(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2);

            // kafka properties
            Properties properties = new Properties();
            properties.put("bootstrap.servers", bootstrapServers);

            // kafka producer
            FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer(
                    sinkTopic,
                    (KafkaSerializationSchema) (o, aLong) -> {
                        String msg = String.valueOf(o);
                        JSONObject jsonMsg = JSONObject.parseObject(msg);
                        String tableName = jsonMsg.getString("TABLE_NAME");
                        String id = jsonMsg.getString("ID");
                        return new ProducerRecord(sinkTopic, (tableName + 
id).getBytes(), msg.getBytes());
                    },
                    properties,
                    FlinkKafkaProducer.Semantic.AT_LEAST_ONCE);

            // sink to kafka
            finalStream.addSink(kafkaProducer).name("ResultDataSink");

            // execute
            env.execute(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS);
        } catch (Exception e) {
            LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR");
            LOG.error("ERROR_INFO:" + e.getMessage());
        }
    }

    private static long getHandleDayEpochMilli(String handleDay) {
        DateTimeFormatter ftf = DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss");
        LocalDateTime parse = LocalDateTime.parse(handleDay + " 00:00:00", ftf);
        return 
LocalDateTime.from(parse).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
    }

    private static Properties getProperties(String bootstrapServers, String 
groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", bootstrapServers);
        props.put("group.id", groupId);
        props.put("enable.auto.commit", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
        return props;
    }

    private static String getAcceptType(String resPrejudgeFlag) {
        if (SIMPLE_ACCEPTANCE.equals(resPrejudgeFlag)) {
            return "2";
        } else {
            return "1";
        }
    }

    private static String getResPrejudgeFlag(String resPrejudgeFlag) {
        return StringUtils.isEmpty(resPrejudgeFlag) ? "W" : resPrejudgeFlag;
    }

    private static class PrimaryMapFunction extends RichMapFunction<String, 
Tuple2<String,String>> {
        Logger LOG = LoggerFactory.getLogger(PrimaryMapFunction.class);
        @Override
        public Tuple2<String,String> map(String msg) {
            try {
                JSONObject jsonMsg = JSONObject.parseObject(msg);
                String tableData = jsonMsg.getString("TABLE_DATA");
                JSONObject data = JSONObject.parseObject(tableData);
                String cbOrderId = data.getString("CB_ORDER_ID");
                 return new Tuple2<>(cbOrderId,msg);
            } catch (Exception e) {
                LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At 
PrimaryMapFunction.map");
                LOG.error("ERROR_INFO:" + e.getMessage());
                LOG.error("ERROR_MSG:" + msg);
                return null;
            }
        }
    }

    private static class PrimaryRichMapFunction extends 
RichMapFunction<Tuple2<String, String>, OnlineOrderUncompleted> {
        Logger LOG = LoggerFactory.getLogger(PrimaryRichMapFunction.class);

        private String handleDay = "";

        public PrimaryRichMapFunction(String handleDay) {
            this.handleDay = handleDay;
        }

        private transient MapState<String, Long> dayUnComputeCnt;

        @Override
        public void open(Configuration parameters) {
            try {
                MapStateDescriptor<String, Long> descriptor = new 
MapStateDescriptor(
                        "dayUnComputeCnt",
                        TypeInformation.of(String.class),
                        TypeInformation.of(Long.class)
                );
                dayUnComputeCnt = getRuntimeContext().getMapState(descriptor);
            } catch (Exception e) {
                LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At 
PrimaryRichMapFunction.open");
                LOG.error("ERROR_INFO:" + e.getMessage());
            }
        }

        @Override
        public OnlineOrderUncompleted map(Tuple2<String, String> t) {
            try {
                JSONObject jsonMsg = JSONObject.parseObject(t.f1);
                JSONObject data = 
JSONObject.parseObject(jsonMsg.getString("TABLE_DATA"));
                String provinceCode = data.getString("PROVINCE_CODE");
                String inModeCode = data.getString("IN_MODE_CODE");
                String siteSelectionType = 
data.getString("SITE_SELECTION_TYPE");
                String resPrejudgeFlag = 
getResPrejudgeFlag(data.getString("RES_PREJUDGE_FLAG"));
                String acceptType = getAcceptType(resPrejudgeFlag);
                String cbOrderId = data.getString("CB_ORDER_ID");
                String receiveDate = data.getString("RECEIVE_DATE");
                String subscribeState = data.getString("SUBSCRIBE_STATE");
                String nextDealTag = data.getString("NEXT_DEAL_TAG");
                String unCompleteKey = provinceCode + CONNECT_CHAR + cbOrderId 
+ CONNECT_CHAR + inModeCode + CONNECT_CHAR + acceptType + CONNECT_CHAR + 
siteSelectionType + CONNECT_CHAR + resPrejudgeFlag;
                long step = 0L;
                if 
(StringUtils.substring(receiveDate,0,10).compareTo(this.handleDay) > 0) {
                    if (SUBSCRIBE_STATE_9.equals(subscribeState) && 
NEXT_DEAL_TAG_0.equals(nextDealTag)) {
                        if (dayUnComputeCnt.contains(unCompleteKey)) {
                            dayUnComputeCnt.remove(unCompleteKey);
                            step = step - 1L;
                        }
                    } else if (SUBSCRIBE_STATE_0.equals(subscribeState) && 
NEXT_DEAL_TAG_T.equals(nextDealTag)) {
                        if (dayUnComputeCnt.contains(unCompleteKey)) {
                            dayUnComputeCnt.remove(unCompleteKey);
                            step = step - 1L;
                        }
                    } else {
                        if (!dayUnComputeCnt.contains(unCompleteKey)) {
                            dayUnComputeCnt.put(unCompleteKey, 0L);
                            step = step + 1L;
                        }
                    }
                }
                return new OnlineOrderUncompleted(provinceCode, inModeCode, 
acceptType, siteSelectionType, resPrejudgeFlag, step, cbOrderId, receiveDate);
            } catch (Exception e) {
                LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At 
PrimaryRichMapFunction.map");
                LOG.error("ERROR_INFO:" + e.getMessage());
                LOG.error("ERROR_MSG:" + t.toString());
                return null;
            }
        }
    }

    private static class PrimaryFilterFunction implements 
FilterFunction<String> {
        Logger LOG = 
LoggerFactory.getLogger(OnlineOrderUncompletedDataProcess.class);

        @Override
        public boolean filter(String msg) {
            try {
                JSONObject jsonMsg = JSONObject.parseObject(msg);
                String tableName = jsonMsg.getString("TABLE_NAME");
                JSONObject data = 
JSONObject.parseObject(jsonMsg.getString("TABLE_DATA"));
                String provinceCode = data.getString("PROVINCE_CODE");
                String inModeCode = data.getString("IN_MODE_CODE");
                String siteSelectionType = 
data.getString("SITE_SELECTION_TYPE");
                String cbOrderId = data.getString("CB_ORDER_ID");
                String receiveDate = data.getString("RECEIVE_DATE");
                if (BROADBAND_ORDER_ACCEPT.equals(tableName)) {
                    if (StringUtils.isEmpty(provinceCode) ||
                            StringUtils.isEmpty(inModeCode) ||
                            StringUtils.isEmpty(siteSelectionType) ||
                            StringUtils.isEmpty(cbOrderId) ||
                            StringUtils.isEmpty(receiveDate)) {
                        return false;
                    }
                    return true;
                }
                return false;
            } catch (Exception e) {
                LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At 
PrimaryFilterFunction.filter");
                LOG.error("ERROR_INFO:" + e.getMessage());
                LOG.error("ERROR_MSG:" + msg);
                return false;
            }
        }
    }

    private static class FinalRichFunction extends 
RichWindowFunction<OnlineOrderUncompleted, String, Tuple, TimeWindow> {
        Logger LOG = LoggerFactory.getLogger(FinalRichFunction.class);

        private transient ValueState<Long> dayComputeCnt;

        @Override
        public void open(Configuration config) {
            try {
                ValueStateDescriptor<Long> descriptor =
                        new ValueStateDescriptor<>("dayComputeCnt",
                                TypeInformation.of(new TypeHint<Long>() {
                                }));
                dayComputeCnt = getRuntimeContext().getState(descriptor);
            } catch (Exception e) {
                LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR(At 
FinalRichFunction)");
                LOG.error("ERROR_INFO:" + e.getMessage());
            }
        }

        @Override
        public void apply(Tuple tuple, TimeWindow timeWindow, 
Iterable<OnlineOrderUncompleted> iterable, Collector<String> collector) {
            try {
                JSONObject messageJson = new JSONObject();
                String provinceCode = "";
                String inModeCode = "";
                String acceptType = "";
                String siteSelectionType = "";
                String resPrejudgeFlag = "";
                long sum = 0L;

                for (OnlineOrderUncompleted record : iterable) {
                    provinceCode = record.getProvinceCode();
                    inModeCode = record.getInModeCode();
                    acceptType = record.getAcceptType();
                    siteSelectionType = record.getSiteSelectionType();
                    resPrejudgeFlag = record.getResPrejudgeFlag();
                    sum = sum + record.getStep();
                }
                // 省份|接入方式|受理方式|选址方式|订单打标
                String id = provinceCode + CONNECT_CHAR_ID + inModeCode + 
CONNECT_CHAR_ID + acceptType + CONNECT_CHAR_ID + siteSelectionType + 
CONNECT_CHAR_ID + resPrejudgeFlag;
                messageJson.put("TABLE_NAME", 
"BROADBAND_NO_REJECTION_UNCOMPLETED_AMOUNT");
                messageJson.put("ID", id);
                messageJson.put("PROVINCE_CODE", provinceCode);
                messageJson.put("IN_MODE_CODE", inModeCode);
                messageJson.put("ACCEPT_TYPE", acceptType);
                messageJson.put("SITE_SELECTION_TYPE", siteSelectionType);
                messageJson.put("RES_PREJUDGE_FLAG", resPrejudgeFlag);
                if (dayComputeCnt.value() == null) {
                    dayComputeCnt.update(sum);
                } else {
                    dayComputeCnt.update(dayComputeCnt.value() + sum);
                }
                // 正式单在途单量
                messageJson.put("ONLINE_ORDER_UNCOMPLETED_AMOUNT", 
dayComputeCnt.value());
                collector.collect(messageJson.toJSONString());
            } catch (Exception e) {
                LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At 
FinalRichFunction.apply");
                LOG.error("ERROR_INFO:" + e.getMessage());
                LOG.error("ERROR_MSG:" + iterable.toString());
            }
        }
    }
}

回复