??????????????????

??????????????????
??1.11????????flink sql??????????????,??streaming api 
????kafka,????eventtime,????stream??table,????sql????,??????kafka 
topic????????????,flink webui watermarks ????No 
Watermark,????????????????????????,????kafka 
topic??????????????????????????????????,watermarks????????????

????????????????kafka??????????????



???????? samuel....@ubtrobot.com
?????????? 2020-09-03 09:23
???????? user-zh
?????? ????: ????????????????????????????????????????????
??????????

??Flink1.11.1??????

????????????
package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

import org.apache.commons.collections.map.HashedMap;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.BroadcastStream;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import 
org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.fastjson.parser.Feature;
import com.ubtechinc.dataplatform.flink.util.AES256;
import com.ubtechinc.dataplatform.flink.util.ConstantStr;
import com.ubtechinc.dataplatform.flink.util.MailUtils;
import com.ubtechinc.dataplatform.flink.util.SmsUtil;
import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner;

import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import com.mysql.jdbc.Connection;

import java.sql.Timestamp;
import java.text.MessageFormat;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

/**
 * ??????????????????????????
 */
public class ExceptionAlertHour4{

private static final Logger LOG = 
LoggerFactory.getLogger(ExceptionAlertHour4.class);

public static void main(String[] args) throws Exception{
ParameterTool parameterTool = ParameterTool.fromArgs(args);
final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
env.getConfig().setGlobalJobParameters(parameterTool);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//System.out.println(parameterTool.get("envFlag"));
Properties properties = new Properties();
String hdfs = null;
long maxTime = 0;
long maxSize = 0;
long inActive = 0;
DataStream<String> ds =null;

if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_TEST;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_TEST);
properties.setProperty("zookeeper.connect", ConstantStr.ZOOKEEPER_IP_PORT_TEST);
properties.setProperty("group.id", 
"etl.exception.monitor.ExceptionAlertHour4-001");
Map<KafkaTopicPartition, Long> offsets = new HashedMap();
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), 
2800L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), 
2700L);
offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), 
3300L);
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, 
new SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets));
} else if (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) {
hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT;
maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT;
maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT;
inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT;
properties.setProperty("bootstrap.servers", ConstantStr.KAFKA_IP_PORT_PRODUCT);
properties.setProperty("zookeeper.connect", 
ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT);
properties.setProperty("group.id", 
"etl.exception.monitor.ExceptionAlertHour-001");
properties.setProperty("auto.offset.reset", "earliest");
ds = env.addSource(new FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, 
new SimpleStringSchema(), properties));
} else {
System.exit(-1);
}
// transform
SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = 
ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() {

@Override
public void flatMap(String value, Collector<Tuple4<String,String,String,Long>> 
out) {

//System.out.println("Kafka2Hdfs-in:" + value);
String newStr = 
value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", "<brbr>");
//System.out.println("Kafka2Hdfs-newStr:" + newStr);

try {
// ????JSON????
JSONObject record = JSON.parseObject(newStr, Feature.OrderedField);

// ????????????????
JSONArray bodyDataArray = record.getJSONArray("body_data");
// ??????????????JSON??????????????????
for (int i = 0; i < bodyDataArray.size(); i++) {
// ??????JSON????????i??????
JSONObject bodyDataObj = bodyDataArray.getJSONObject(i);

if (bodyDataObj != null) { 
Tuple4 log = Tuple4.of(
record.getString("HW-AppId"),
bodyDataObj.getString("HW-bugId"),
bodyDataObj.getString("HW-bugType"),
Long.valueOf(bodyDataObj.getString("HW-happenedAt"))
);
out.collect(log);
}
}

} catch (Exception e) {
System.out.println(e.getMessage());
}
}

});
singleDS.print();
//????eventtime??????????watermark
DataStream<Tuple4<String,String,String,Long>> withTimestampsAndWatermarksDS = 
singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long>>forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))
.withTimestampAssigner((event, timestamp)->event.f3));
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");


String sql = "select  appid,eventid,cnt," +
            "(starttime + interval '8' hour ) as stime," +
            "(endtime + interval '8' hour ) as etime  " +      
            "from (select appid,eventid,count(*) as cnt," +
            "TUMBLE_START(rowtime,INTERVAL '1' HOUR)  as starttime," +
            "TUMBLE_END(rowtime,INTERVAL '1' HOUR)  as endtime  " +
            "from log  group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' 
HOUR),TIME '00:00:00')";


Table table = tenv.sqlQuery(sql);
DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class);
dataStream.print();

env.execute("etl.exception.monitor.ExceptionAlertHour");
}



public static class Result{
private String appid;
private String eventid;
private long cnt;
private Timestamp stime;
private Timestamp etime;
public String getAppid() {
return appid;
}

public void setAppid(String appid) {
this.appid = appid;
}

public String getEventid() {
return eventid;
}

public void setEventid(String eventid) {
this.eventid = eventid;
}

public long getCnt() {
return cnt;
}

public void setCnt(long cnt) {
this.cnt = cnt;
}


public Timestamp getStime(){
return stime;
}

public void setStime(Timestamp stime){
this.stime = stime;
}

public Timestamp getEtime(){
return etime;
}

public void setEtime(Timestamp etime){
this.etime = etime;
}

@Override
public String toString(){
return "ResultHour{" +
      "appid=" + appid +
      ",eventid=" + eventid +
      ",cnt=" + cnt +
      ", stime=" + stime +
      ", etime=" + etime +
      ", SystemTime=" + System.currentTimeMillis() +
      '}';
}
}

}


???????? jacky-cui
?????????? 2020-09-02 18:58
???????? user-zh
?????? ????????????????????????????????????????????
??????flink????????????????????????
 
 
------------------&nbsp;????????&nbsp;------------------
??????:                                                                         
                                               "user-zh"                        
                                                            
<samuel....@ubtrobot.com&gt;;
????????:&nbsp;2020??9??2??(??????) ????3:20
??????:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
 
????:&nbsp;??????????????????????????????????????
 
 
 
????????????flink SQL,????????????tumble 
window????????????????????????????????????????????????????????????????????????
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;
 //????eventtime??????????watermark
DataStream<Tuple4<String,String,String,Long&gt;&gt; 
withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks(
WatermarkStrategy
.<Tuple4<String,String,String,Long&gt;&gt;forBoundedOutOfOrderness(Duration.ofSeconds(5))
//.<Tuple4<String,String,String,Long&gt;&gt;forMonotonousTimestamps()
.withIdleness(Duration.ofSeconds(10))&nbsp;&nbsp; 
//????????????????????watermark
.withTimestampAssigner((event, timestamp)-&gt;event.f3));
 
StreamTableEnvironment tenv = StreamTableEnvironment.create(env);
tenv.registerDataStream(
"log",
withTimestampsAndWatermarksDS,
"appid,bugid,eventid,rowtime.rowtime,proctime.proctime");
 
String sql = "select appid,eventid,cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(starttime 
+ interval '8' hour ) as stime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "(endtime + 
interval '8' hour ) as etime&nbsp; " +&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from 
(select appid,eventid,count(*) as cnt," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"TUMBLE_START(rowtime,INTERVAL '1' HOUR)&nbsp; as starttime," +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; 
"TUMBLE_END(rowtime,INTERVAL '1' HOUR)&nbsp; as endtime&nbsp; " +
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp; "from 
log&nbsp; group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME 
'00:00:00')";&nbsp;&nbsp;&nbsp; //????????????????????????????
 
Table table = tenv.sqlQuery(sql);
DataStream<Result&gt; dataStream = tenv.toAppendStream(table, Result.class);
 
??????????????
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12
(400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 
????????2020-09-01 18:00:00.0??????????????????????????????????
(400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 
????????????????????????
ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 17:00:00.0, 
etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 //2020/9/2 15:23:35}
????????????????????????????????

回复