附件是代码 还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗 -- 代码是读出所有map状态的key。 在 2020-04-30 09:40:45,"shx" <17611022...@163.com> 写道: >能发一下写入状态的代码看一下吗,还有一个问题,键值状态访问,你的代码里是读出了所有key关键的mapstate吗,谢谢 > > > > >| | >邵红晓 >| >| >邮箱:17611022...@163.com >| > >签名由 网易邮箱大师 定制 > >在2020年04月30日 09:04,guanyq 写道: >代码中没特别指定Serializer。都是默认的序列化。 >在 2020-04-29 18:20:22,"Congxian Qiu" <qcx978132...@gmail.com> 写道: >>Hi >>从错误日志看,是 StateMigration 相关的问题。 >>你需要确认下,你的代码中的 Serializer 和 savepoint 中 state 相关的 serializer >>是一样的或者是兼容的,你可以参考下这个文档[1] >> >>[1] >>https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/stream/state/schema_evolution.html >> >>Best, >>Congxian >> >> >>guanyq <dlgua...@163.com> 于2020年4月29日周三 下午6:09写道: >> >>> >>> 附件是代码和错误日志。目前不知道如何调查。麻烦帮忙看下 谢谢。
package com.data.processing.entity; /** * æ°å¢ * * @author guanyq * @date 2020/4/5 */ public class OnlineOrderUncompleted { private String provinceCode; private String inModeCode; private String acceptType; private String siteSelectionType; private String resPrejudgeFlag; private Long step; private String orderId; private String receiveDate; public OnlineOrderUncompleted() {} public OnlineOrderUncompleted(String provinceCode, String inModeCode, String acceptType, String siteSelectionType, String resPrejudgeFlag, Long step, String orderId, String receiveDate) { this.provinceCode = provinceCode; this.inModeCode = inModeCode; this.acceptType = acceptType; this.siteSelectionType = siteSelectionType; this.resPrejudgeFlag = resPrejudgeFlag; this.step = step; this.orderId = orderId; this.receiveDate = receiveDate; } @Override public String toString() { return "OnlineOrderUncompleted{" + "provinceCode='" + provinceCode + '\'' + ", inModeCode='" + inModeCode + '\'' + ", acceptType='" + acceptType + '\'' + ", siteSelectionType='" + siteSelectionType + '\'' + ", resPrejudgeFlag='" + resPrejudgeFlag + '\'' + ", step='" + step + '\'' + ", orderId='" + orderId + '\'' + ", receiveDate='" + receiveDate + '\'' + '}'; } public String getOrderId() { return orderId; } public void setOrderId(String orderId) { this.orderId = orderId; } public String getProvinceCode() { return provinceCode; } public void setProvinceCode(String provinceCode) { this.provinceCode = provinceCode; } public String getInModeCode() { return inModeCode; } public void setInModeCode(String inModeCode) { this.inModeCode = inModeCode; } public String getAcceptType() { return acceptType; } public void setAcceptType(String acceptType) { this.acceptType = acceptType; } public String getSiteSelectionType() { return siteSelectionType; } public void setSiteSelectionType(String siteSelectionType) { this.siteSelectionType = siteSelectionType; } public String getResPrejudgeFlag() { return resPrejudgeFlag; } public void setResPrejudgeFlag(String resPrejudgeFlag) { this.resPrejudgeFlag = resPrejudgeFlag; } public String getReceiveDate() { return receiveDate; } public void setReceiveDate(String receiveDate) { this.receiveDate = receiveDate; } public Long getStep() { return step; } public void setStep(Long step) { this.step = step; } }
package com.data.processing.unconditionalacceptance; import com.alibaba.fastjson.JSONObject; import com.data.processing.entity.OnlineOrderUncompleted; import org.apache.commons.lang3.StringUtils; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.MapState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer; import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema; import org.apache.flink.util.Collector; import org.apache.kafka.clients.producer.ProducerRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.time.LocalDateTime; import java.time.ZoneId; import java.time.format.DateTimeFormatter; import java.util.ArrayList; import java.util.List; import java.util.Properties; /** * æ£å¼åå¨é * * @author guanyq * @date 2020/04/04 */ public class OnlineOrderUncompletedDataProcess { public static final String BROADBAND_ORDER_ACCEPT = "BROADBAND_ORDER_ACCEPT"; public static final String SIMPLE_ACCEPTANCE = "E"; public static final String SUBSCRIBE_STATE_9 = "9"; public static final String NEXT_DEAL_TAG_0 = "0"; public static final String SUBSCRIBE_STATE_0 = "0"; public static final String NEXT_DEAL_TAG_T = "T"; public static final String CONNECT_CHAR = "|"; public static final String CONNECT_CHAR_ID = "_"; public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS = "OnlineOrderUncompletedDataProcess"; public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1 = "OnlineOrderUncompletedDataProcess_1"; public static final String ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2 = "OnlineOrderUncompletedDataProcess_2"; public static void main(String[] args) { // log Logger LOG = LoggerFactory.getLogger(OnlineOrderUncompletedDataProcess.class); try { // parameter final String handleDay; final String bootstrapServers; final String consumerTopic; final String groupId; final String sinkTopic; final String windowTime; final String checkpointDataUri; final String checkpointInterval; final ParameterTool params = ParameterTool.fromArgs(args); handleDay = params.getRequired("handle.day"); bootstrapServers = params.getRequired("bootstrap.servers"); consumerTopic = params.getRequired("consumer.topic"); groupId = params.getRequired("group.id"); sinkTopic = params.getRequired("sink.topic"); windowTime = params.getRequired("window.time"); checkpointDataUri = params.getRequired("checkpoint.data.uri"); checkpointInterval = params.getRequired("checkpoint.interval"); // get env final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); env.enableCheckpointing(Long.parseLong(checkpointInterval)); StateBackend backend = new FsStateBackend(checkpointDataUri); env.setStateBackend(backend); // set kafka topic List<String> topicList = new ArrayList<>(); topicList.add(consumerTopic); Properties props = getProperties(bootstrapServers, groupId); // get kafka consumer FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>(topicList, new SimpleStringSchema(), props); long handleDayEpochMilli = getHandleDayEpochMilli(handleDay); consumer.setStartFromTimestamp(handleDayEpochMilli); // kafka source SingleOutputStreamOperator<String> ds = env.addSource(consumer).name("source").uid("source"); // 计ç®æ£å¼åå¨ééï¼è¿æ»¤ SingleOutputStreamOperator<String> filter = ds.filter(new PrimaryFilterFunction()); // format tuple SingleOutputStreamOperator<Tuple2<String, String>> primaryMap = filter.map(new PrimaryMapFunction()); // format map SingleOutputStreamOperator<OnlineOrderUncompleted> map = primaryMap .keyBy(0) .map(new PrimaryRichMapFunction(handleDay)) .name(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1) .uid(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_1); SingleOutputStreamOperator<String> finalStream = map .keyBy("provinceCode", "inModeCode", "acceptType", "siteSelectionType", "resPrejudgeFlag") .timeWindow(Time.seconds(Long.parseLong(windowTime))) .apply(new FinalRichFunction()) .name(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2) .uid(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS_2); // kafka properties Properties properties = new Properties(); properties.put("bootstrap.servers", bootstrapServers); // kafka producer FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer( sinkTopic, (KafkaSerializationSchema) (o, aLong) -> { String msg = String.valueOf(o); JSONObject jsonMsg = JSONObject.parseObject(msg); String tableName = jsonMsg.getString("TABLE_NAME"); String id = jsonMsg.getString("ID"); return new ProducerRecord(sinkTopic, (tableName + id).getBytes(), msg.getBytes()); }, properties, FlinkKafkaProducer.Semantic.AT_LEAST_ONCE); // sink to kafka finalStream.addSink(kafkaProducer).name("ResultDataSink"); // execute env.execute(ONLINE_ORDER_UNCOMPLETED_DATA_PROCESS); } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR"); LOG.error("ERROR_INFO:" + e.getMessage()); } } private static long getHandleDayEpochMilli(String handleDay) { DateTimeFormatter ftf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"); LocalDateTime parse = LocalDateTime.parse(handleDay + " 00:00:00", ftf); return LocalDateTime.from(parse).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli(); } private static Properties getProperties(String bootstrapServers, String groupId) { Properties props = new Properties(); props.put("bootstrap.servers", bootstrapServers); props.put("group.id", groupId); props.put("enable.auto.commit", "false"); props.put("auto.offset.reset", "earliest"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); return props; } private static String getAcceptType(String resPrejudgeFlag) { if (SIMPLE_ACCEPTANCE.equals(resPrejudgeFlag)) { return "2"; } else { return "1"; } } private static String getResPrejudgeFlag(String resPrejudgeFlag) { return StringUtils.isEmpty(resPrejudgeFlag) ? "W" : resPrejudgeFlag; } private static class PrimaryMapFunction extends RichMapFunction<String, Tuple2<String,String>> { Logger LOG = LoggerFactory.getLogger(PrimaryMapFunction.class); @Override public Tuple2<String,String> map(String msg) { try { JSONObject jsonMsg = JSONObject.parseObject(msg); String tableData = jsonMsg.getString("TABLE_DATA"); JSONObject data = JSONObject.parseObject(tableData); String cbOrderId = data.getString("CB_ORDER_ID"); return new Tuple2<>(cbOrderId,msg); } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At PrimaryMapFunction.map"); LOG.error("ERROR_INFO:" + e.getMessage()); LOG.error("ERROR_MSG:" + msg); return null; } } } private static class PrimaryRichMapFunction extends RichMapFunction<Tuple2<String, String>, OnlineOrderUncompleted> { Logger LOG = LoggerFactory.getLogger(PrimaryRichMapFunction.class); private String handleDay = ""; public PrimaryRichMapFunction(String handleDay) { this.handleDay = handleDay; } private transient MapState<String, Long> dayUnComputeCnt; @Override public void open(Configuration parameters) { try { MapStateDescriptor<String, Long> descriptor = new MapStateDescriptor( "dayUnComputeCnt", TypeInformation.of(String.class), TypeInformation.of(Long.class) ); dayUnComputeCnt = getRuntimeContext().getMapState(descriptor); } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At PrimaryRichMapFunction.open"); LOG.error("ERROR_INFO:" + e.getMessage()); } } @Override public OnlineOrderUncompleted map(Tuple2<String, String> t) { try { JSONObject jsonMsg = JSONObject.parseObject(t.f1); JSONObject data = JSONObject.parseObject(jsonMsg.getString("TABLE_DATA")); String provinceCode = data.getString("PROVINCE_CODE"); String inModeCode = data.getString("IN_MODE_CODE"); String siteSelectionType = data.getString("SITE_SELECTION_TYPE"); String resPrejudgeFlag = getResPrejudgeFlag(data.getString("RES_PREJUDGE_FLAG")); String acceptType = getAcceptType(resPrejudgeFlag); String cbOrderId = data.getString("CB_ORDER_ID"); String receiveDate = data.getString("RECEIVE_DATE"); String subscribeState = data.getString("SUBSCRIBE_STATE"); String nextDealTag = data.getString("NEXT_DEAL_TAG"); String unCompleteKey = provinceCode + CONNECT_CHAR + cbOrderId + CONNECT_CHAR + inModeCode + CONNECT_CHAR + acceptType + CONNECT_CHAR + siteSelectionType + CONNECT_CHAR + resPrejudgeFlag; long step = 0L; if (StringUtils.substring(receiveDate,0,10).compareTo(this.handleDay) > 0) { if (SUBSCRIBE_STATE_9.equals(subscribeState) && NEXT_DEAL_TAG_0.equals(nextDealTag)) { if (dayUnComputeCnt.contains(unCompleteKey)) { dayUnComputeCnt.remove(unCompleteKey); step = step - 1L; } } else if (SUBSCRIBE_STATE_0.equals(subscribeState) && NEXT_DEAL_TAG_T.equals(nextDealTag)) { if (dayUnComputeCnt.contains(unCompleteKey)) { dayUnComputeCnt.remove(unCompleteKey); step = step - 1L; } } else { if (!dayUnComputeCnt.contains(unCompleteKey)) { dayUnComputeCnt.put(unCompleteKey, 0L); step = step + 1L; } } } return new OnlineOrderUncompleted(provinceCode, inModeCode, acceptType, siteSelectionType, resPrejudgeFlag, step, cbOrderId, receiveDate); } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At PrimaryRichMapFunction.map"); LOG.error("ERROR_INFO:" + e.getMessage()); LOG.error("ERROR_MSG:" + t.toString()); return null; } } } private static class PrimaryFilterFunction implements FilterFunction<String> { Logger LOG = LoggerFactory.getLogger(OnlineOrderUncompletedDataProcess.class); @Override public boolean filter(String msg) { try { JSONObject jsonMsg = JSONObject.parseObject(msg); String tableName = jsonMsg.getString("TABLE_NAME"); JSONObject data = JSONObject.parseObject(jsonMsg.getString("TABLE_DATA")); String provinceCode = data.getString("PROVINCE_CODE"); String inModeCode = data.getString("IN_MODE_CODE"); String siteSelectionType = data.getString("SITE_SELECTION_TYPE"); String cbOrderId = data.getString("CB_ORDER_ID"); String receiveDate = data.getString("RECEIVE_DATE"); if (BROADBAND_ORDER_ACCEPT.equals(tableName)) { if (StringUtils.isEmpty(provinceCode) || StringUtils.isEmpty(inModeCode) || StringUtils.isEmpty(siteSelectionType) || StringUtils.isEmpty(cbOrderId) || StringUtils.isEmpty(receiveDate)) { return false; } return true; } return false; } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At PrimaryFilterFunction.filter"); LOG.error("ERROR_INFO:" + e.getMessage()); LOG.error("ERROR_MSG:" + msg); return false; } } } private static class FinalRichFunction extends RichWindowFunction<OnlineOrderUncompleted, String, Tuple, TimeWindow> { Logger LOG = LoggerFactory.getLogger(FinalRichFunction.class); private transient ValueState<Long> dayComputeCnt; @Override public void open(Configuration config) { try { ValueStateDescriptor<Long> descriptor = new ValueStateDescriptor<>("dayComputeCnt", TypeInformation.of(new TypeHint<Long>() { })); dayComputeCnt = getRuntimeContext().getState(descriptor); } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERRORï¼At FinalRichFunctionï¼"); LOG.error("ERROR_INFO:" + e.getMessage()); } } @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<OnlineOrderUncompleted> iterable, Collector<String> collector) { try { JSONObject messageJson = new JSONObject(); String provinceCode = ""; String inModeCode = ""; String acceptType = ""; String siteSelectionType = ""; String resPrejudgeFlag = ""; long sum = 0L; for (OnlineOrderUncompleted record : iterable) { provinceCode = record.getProvinceCode(); inModeCode = record.getInModeCode(); acceptType = record.getAcceptType(); siteSelectionType = record.getSiteSelectionType(); resPrejudgeFlag = record.getResPrejudgeFlag(); sum = sum + record.getStep(); } // ç份|æ¥å ¥æ¹å¼|åçæ¹å¼|éåæ¹å¼|订åææ String id = provinceCode + CONNECT_CHAR_ID + inModeCode + CONNECT_CHAR_ID + acceptType + CONNECT_CHAR_ID + siteSelectionType + CONNECT_CHAR_ID + resPrejudgeFlag; messageJson.put("TABLE_NAME", "BROADBAND_NO_REJECTION_UNCOMPLETED_AMOUNT"); messageJson.put("ID", id); messageJson.put("PROVINCE_CODE", provinceCode); messageJson.put("IN_MODE_CODE", inModeCode); messageJson.put("ACCEPT_TYPE", acceptType); messageJson.put("SITE_SELECTION_TYPE", siteSelectionType); messageJson.put("RES_PREJUDGE_FLAG", resPrejudgeFlag); if (dayComputeCnt.value() == null) { dayComputeCnt.update(sum); } else { dayComputeCnt.update(dayComputeCnt.value() + sum); } // æ£å¼åå¨éåé messageJson.put("ONLINE_ORDER_UNCOMPLETED_AMOUNT", dayComputeCnt.value()); collector.collect(messageJson.toJSONString()); } catch (Exception e) { LOG.error("ONLINE_ORDER_UNCOMPLETED_ERROR At FinalRichFunction.apply"); LOG.error("ERROR_INFO:" + e.getMessage()); LOG.error("ERROR_MSG:" + iterable.toString()); } } } }