?????????? ??Flink1.11.1??????
???????????? package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import org.apache.commons.collections.map.HashedMap; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.serialization.SimpleStringEncoder; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.state.BroadcastState; import org.apache.flink.api.common.state.MapStateDescriptor; import org.apache.flink.api.common.state.ReadOnlyBroadcastState; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.BroadcastStream; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.alibaba.fastjson.parser.Feature; import com.ubtechinc.dataplatform.flink.util.AES256; import com.ubtechinc.dataplatform.flink.util.ConstantStr; import com.ubtechinc.dataplatform.flink.util.MailUtils; import com.ubtechinc.dataplatform.flink.util.SmsUtil; import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.sql.ResultSet; import com.mysql.jdbc.Connection; import java.sql.Timestamp; import java.text.MessageFormat; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; /** * ?????????????????????????? */ public class ExceptionAlertHour4{ private static final Logger LOG = LoggerFactory.getLogger(ExceptionAlertHour4.class); public static void main(String[] args) throws Exception{ ParameterTool parameterTool = ParameterTool.fromArgs(args); final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.getConfig().setGlobalJobParameters(parameterTool); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //System.out.println(parameterTool.get("envFlag")); Properties properties = new Properties(); String hdfs = null; long maxTime = 0; long maxSize = 0; long inActive = 0; DataStream<String> ds =null; if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_TEST; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour4-001"); Map<KafkaTopicPartition, Long> offsets = new HashedMap(); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 2800L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 2700L); offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 3300L); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); } else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT); properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); properties.setProperty("group.id", "etl.exception.monitor.ExceptionAlertHour-001"); properties.setProperty("auto.offset.reset", "earliest"); ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new SimpleStringSchema(), properties)); } else { System.exit(-1); } // transform SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() { @Override public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> out) { //System.out.println("Kafka2Hdfs-in:" + value); String newStr = value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>"); //System.out.println("Kafka2Hdfs-newStr:" + newStr); try { // ????JSON???? JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); // ???????????????? JSONArray bodyDataArray = record.getJSONArray("body_data"); // ??????????????JSON?????????????????? for (int i = 0; i < bodyDataArray.size(); i++) { // ??????JSON????????i?????? JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); if (bodyDataObj != null) { Tuple4 log = Tuple4.of( record.getString("HW-AppId"), bodyDataObj.getString("HW-bugId"), bodyDataObj.getString("HW-bugType"), Long.valueOf(bodyDataObj.getString("HW-happenedAt")) ); out.collect(log); } } } catch (Exception e) { System.out.println(e.getMessage()); } } }); singleDS.print(); //????eventtime??????????watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); dataStream.print(); env.execute("etl.exception.monitor.ExceptionAlertHour"); } public static class Result{ private String appid; private String eventid; private long cnt; private Timestamp stime; private Timestamp etime; public String getAppid() { return appid; } public void setAppid(String appid) { this.appid = appid; } public String getEventid() { return eventid; } public void setEventid(String eventid) { this.eventid = eventid; } public long getCnt() { return cnt; } public void setCnt(long cnt) { this.cnt = cnt; } public Timestamp getStime(){ return stime; } public void setStime(Timestamp stime){ this.stime = stime; } public Timestamp getEtime(){ return etime; } public void setEtime(Timestamp etime){ this.etime = etime; } @Override public String toString(){ return "ResultHour{" + "appid=" + appid + ",eventid=" + eventid + ",cnt=" + cnt + ", stime=" + stime + ", etime=" + etime + ", SystemTime=" + System.currentTimeMillis() + '}'; } } } ???????????????????????????? | ???????? | ???????????? | ?????? ???? Samuel Qiu ????/????: +0086 150 1356 8368 Email: samuel....@ubtrobot.com UBTECH Robotics ?? www.ubtrobot.com ??????????????????????????1001??????????C1??25?? ???????? jacky-cui ?????????? 2020-09-02 18:58 ???????? user-zh ?????? ???????????????????????????????????????????? ??????flink???????????????????????? ------------------ ???????? ------------------ ??????: "user-zh" <samuel....@ubtrobot.com>; ????????: 2020??9??2??(??????) ????3:20 ??????: "user-zh"<user-zh@flink.apache.org>; ????: ?????????????????????????????????????? ????????????flink SQL,????????????tumble window???????????????????????????????????????????????????????????????????????? //????eventtime??????????watermark DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( WatermarkStrategy .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(10)) //????????????????????watermark .withTimestampAssigner((event, timestamp)->event.f3)); StreamTableEnvironment tenv = StreamTableEnvironment.create(env); tenv.registerDataStream( "log", withTimestampsAndWatermarksDS, "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); String sql = "select appid,eventid,cnt," + "(starttime + interval '8' hour ) as stime," + "(endtime + interval '8' hour ) as etime " + "from (select appid,eventid,count(*) as cnt," + "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME '00:00:00')"; //???????????????????????????? Table table = tenv.sqlQuery(sql); DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); ?????????????? (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 ????????2020-09-01 18:00:00.0?????????????????????????????????? (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 ???????????????????????? ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35} ????????????????????????????????