可否发下是哪个配置,有相关的文档吗? superainbower <superainbo...@163.com> 于2020年9月4日周五 下午5:24写道:
> 1.11的版本已经加入了 新的配置项,避免了数据倾斜导致某个分区没数据 而不触发计算的问题 > > > | | > superainbower > | > | > superainbo...@163.com > | > 签名由网易邮箱大师定制 > > > 在2020年09月4日 15:11,taochanglian<taochangl...@163.com> 写道: > 确实是这样,比如有你多个partition,但是只有1个partition里面有数据,wm就不会执行计算。需要保证多个parititon数据都有数据。 > > 举个例子,我这里在做测试的时候,1个topic10个partition,由于测试环境,按照key > hash,数据只进入1个partition,就不会触发计算。后来让这个key轮询到10个parition都有数据,就可以触发计算了。 > > 在 2020/9/4 13:14, Benchao Li 写道: > 如果你有多个partition,并且有一些partition没有数据,这个时候的确是会存在这个问题。 > 要处理这种情况,可以了解下idle source[1] > > [1] > > https://ci.apache.org/projects/flink/flink-docs-master/dev/event_timestamps_watermarks.html#dealing-with-idle-sources > > samuel....@ubtrobot.com <samuel....@ubtrobot.com> 于2020年9月3日周四 下午3:41写道: > > 补充一下环境信息: > > 有点类似以下问题: > 在1.11版本测试flink sql时发现一个问题,用streaming api > 消费kafka,使用eventtime,再把stream转table,进行sql聚合,发现当kafka topic是多个分区时,flink webui > watermarks 显示No Watermark,聚合计算也迟迟不触发计算,但当kafka > topic只有一个分区时却能这个正常触发计算,watermarks也显示正常。 > > 不确定是否是因为kafka多分区引起的? > > > > 发件人: samuel....@ubtrobot.com > 发送时间: 2020-09-03 09:23 > 收件人: user-zh > 主题: 回复: 回复:请指教一个关于时间窗的问题,非常感谢! > 谢谢回复! > > 是Flink1.11.1的版本 > > 以下是代码: > package com.ubtechinc.dataplatform.flink.etl.exception.monitor;/* > * Licensed to the Apache Software Foundation (ASF) under one > * or more contributor license agreements. See the NOTICE file > * distributed with this work for additional information > * regarding copyright ownership. The ASF licenses this file > * to you under the Apache License, Version 2.0 (the > * "License"); you may not use this file except in compliance > * with the License. You may obtain a copy of the License at > * > * http://www.apache.org/licenses/LICENSE-2.0 > * > * Unless required by applicable law or agreed to in writing, software > * distributed under the License is distributed on an "AS IS" BASIS, > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. > * See the License for the specific language governing permissions and > * limitations under the License. > */ > > import org.apache.commons.collections.map.HashedMap; > import org.apache.flink.api.common.eventtime.WatermarkStrategy; > import org.apache.flink.api.common.functions.FlatMapFunction; > import org.apache.flink.api.common.serialization.SimpleStringEncoder; > import org.apache.flink.api.common.serialization.SimpleStringSchema; > import org.apache.flink.api.common.state.BroadcastState; > import org.apache.flink.api.common.state.MapStateDescriptor; > import org.apache.flink.api.common.state.ReadOnlyBroadcastState; > import org.apache.flink.api.common.typeinfo.BasicTypeInfo; > import org.apache.flink.api.common.typeinfo.TypeInformation; > import org.apache.flink.api.java.tuple.Tuple4; > import org.apache.flink.api.java.utils.ParameterTool; > import org.apache.flink.configuration.Configuration; > import org.apache.flink.core.fs.Path; > import org.apache.flink.runtime.state.StateBackend; > import org.apache.flink.runtime.state.filesystem.FsStateBackend; > import org.apache.flink.streaming.api.CheckpointingMode; > import org.apache.flink.streaming.api.TimeCharacteristic; > import org.apache.flink.streaming.api.datastream.BroadcastStream; > import org.apache.flink.streaming.api.datastream.DataStream; > import org.apache.flink.streaming.api.datastream.DataStreamSource; > import > org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; > import > org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; > import > > org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup; > import org.apache.flink.streaming.api.functions.co > .BroadcastProcessFunction; > import > org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; > import > org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; > import > > org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; > import org.apache.flink.streaming.api.functions.source.RichSourceFunction; > import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer; > import > org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; > import org.apache.flink.table.api.Table; > import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; > import org.apache.flink.util.Collector; > import org.slf4j.Logger; > import org.slf4j.LoggerFactory; > > import com.alibaba.fastjson.JSON; > import com.alibaba.fastjson.JSONArray; > import com.alibaba.fastjson.JSONObject; > import com.alibaba.fastjson.parser.Feature; > import com.ubtechinc.dataplatform.flink.util.AES256; > import com.ubtechinc.dataplatform.flink.util.ConstantStr; > import com.ubtechinc.dataplatform.flink.util.MailUtils; > import com.ubtechinc.dataplatform.flink.util.SmsUtil; > import com.ubtechinc.dataplatform.flink.util.YearMonthDayBucketAssigner; > > import java.sql.DriverManager; > import java.sql.PreparedStatement; > import java.sql.ResultSet; > import com.mysql.jdbc.Connection; > > import java.sql.Timestamp; > import java.text.MessageFormat; > import java.time.Duration; > import java.util.ArrayList; > import java.util.HashMap; > import java.util.List; > import java.util.Map; > import java.util.Properties; > > /** > * 使用广播实现动态的配置更新 > */ > public class ExceptionAlertHour4{ > > private static final Logger LOG = > LoggerFactory.getLogger(ExceptionAlertHour4.class); > > public static void main(String[] args) throws Exception{ > ParameterTool parameterTool = ParameterTool.fromArgs(args); > final StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env.getConfig().setGlobalJobParameters(parameterTool); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > //System.out.println(parameterTool.get("envFlag")); > Properties properties = new Properties(); > String hdfs = null; > long maxTime = 0; > long maxSize = 0; > long inActive = 0; > DataStream<String> ds =null; > > if (ConstantStr.ENV_FLAG_TEST.equals(parameterTool.get("envFlag"))) { > hdfs = ConstantStr.HDFS_IP_PORT_TEST; > maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_TEST; > maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_TEST; > inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_TEST; > properties.setProperty("bootstrap.servers", > ConstantStr.KAFKA_IP_PORT_TEST); > properties.setProperty("zookeeper.connect", > ConstantStr.ZOOKEEPER_IP_PORT_TEST); > properties.setProperty("group.id", > "etl.exception.monitor.ExceptionAlertHour4-001"); > Map<KafkaTopicPartition, Long> offsets = new HashedMap(); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 0), > 2800L); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 1), > 2700L); > offsets.put(new KafkaTopicPartition(ConstantStr.KAFKA_TOPIC_EXCEPTION, 2), > 3300L); > ds = env.addSource(new > FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new > SimpleStringSchema(), properties).setStartFromSpecificOffsets(offsets)); > } else if > (ConstantStr.ENV_FLAG_PRODUCT.equals(parameterTool.get("envFlag"))) { > hdfs = ConstantStr.HDFS_IP_PORT_PRODUCT; > maxTime = ConstantStr.ROLLINGPOLICY_MAXTIME_PRODUCT; > maxSize = ConstantStr.ROLLINGPOLICY_MAXSIZE_PRODUCT; > inActive = ConstantStr.ROLLINGPOLICY_INACTIVE_PRODUCT; > properties.setProperty("bootstrap.servers", > ConstantStr.KAFKA_IP_PORT_PRODUCT); > properties.setProperty("zookeeper.connect", > ConstantStr.ZOOKEEPER_IP_PORT_PRODUCT); > properties.setProperty("group.id", > "etl.exception.monitor.ExceptionAlertHour-001"); > properties.setProperty("auto.offset.reset", "earliest"); > ds = env.addSource(new > FlinkKafkaConsumer<>(ConstantStr.KAFKA_TOPIC_EXCEPTION, new > SimpleStringSchema(), properties)); > } else { > System.exit(-1); > } > // transform > SingleOutputStreamOperator<Tuple4<String,String,String,Long>> singleDS = > ds.flatMap(new FlatMapFunction<String, Tuple4<String,String,String,Long>>() > { > > @Override > public void flatMap(String value, > Collector<Tuple4<String,String,String,Long>> out) { > > //System.out.println("Kafka2Hdfs-in:" + value); > String newStr = > value.replaceAll("\\\\*\\r|\\\\+r|\\\\*\\n|\\\\+n|\\\\*\\t|\\\\+t", > "<brbr>"); > //System.out.println("Kafka2Hdfs-newStr:" + newStr); > > try { > // 解析JSON数据 > JSONObject record = JSON.parseObject(newStr, Feature.OrderedField); > > // 获取最新的字段值 > JSONArray bodyDataArray = record.getJSONArray("body_data"); > // 遍历,字段值的JSON数组,只有一个元素 > for (int i = 0; i < bodyDataArray.size(); i++) { > // 获取到JSON数组的第i个元素 > JSONObject bodyDataObj = bodyDataArray.getJSONObject(i); > > if (bodyDataObj != null) { > Tuple4 log = Tuple4.of( > record.getString("HW-AppId"), > bodyDataObj.getString("HW-bugId"), > bodyDataObj.getString("HW-bugType"), > Long.valueOf(bodyDataObj.getString("HW-happenedAt")) > ); > out.collect(log); > } > } > > } catch (Exception e) { > System.out.println(e.getMessage()); > } > } > > }); > singleDS.print(); > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> > withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) > .withTimestampAssigner((event, timestamp)->event.f3)); > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > > String sql = "select appid,eventid,cnt," + > "(starttime + interval '8' hour ) as stime," + > "(endtime + interval '8' hour ) as etime " + > "from (select appid,eventid,count(*) as cnt," + > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' > HOUR),TIME '00:00:00')"; > > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, Result.class); > dataStream.print(); > > env.execute("etl.exception.monitor.ExceptionAlertHour"); > } > > > > public static class Result{ > private String appid; > private String eventid; > private long cnt; > private Timestamp stime; > private Timestamp etime; > public String getAppid() { > return appid; > } > > public void setAppid(String appid) { > this.appid = appid; > } > > public String getEventid() { > return eventid; > } > > public void setEventid(String eventid) { > this.eventid = eventid; > } > > public long getCnt() { > return cnt; > } > > public void setCnt(long cnt) { > this.cnt = cnt; > } > > > public Timestamp getStime(){ > return stime; > } > > public void setStime(Timestamp stime){ > this.stime = stime; > } > > public Timestamp getEtime(){ > return etime; > } > > public void setEtime(Timestamp etime){ > this.etime = etime; > } > > @Override > public String toString(){ > return "ResultHour{" + > "appid=" + appid + > ",eventid=" + eventid + > ",cnt=" + cnt + > ", stime=" + stime + > ", etime=" + etime + > ", SystemTime=" + System.currentTimeMillis() + > '}'; > } > } > > } > > > 发件人: jacky-cui > 发送时间: 2020-09-02 18:58 > 收件人: user-zh > 主题: 回复:请指教一个关于时间窗的问题,非常感谢! > 你这个flink是什么版本,能贴全代码吗 > > > ------------------ 原始邮件 ------------------ > 发件人: > "user-zh" > < > samuel....@ubtrobot.com>; > 发送时间: 2020年9月2日(星期三) 下午3:20 > 收件人: "user-zh"<user-zh@flink.apache.org>; > > 主题: 请指教一个关于时间窗的问题,非常感谢! > > > > 大牛好:使用flink SQL,希望可以通过tumble window(每小时)来计算,现在遇到问题是到整点时,是没有触发计算的,请帮忙看看! > > > //指定eventtime字段及生成watermark > DataStream<Tuple4<String,String,String,Long>> > withTimestampsAndWatermarksDS = singleDS.assignTimestampsAndWatermarks( > WatermarkStrategy > > > .<Tuple4<String,String,String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5)) > //.<Tuple4<String,String,String,Long>>forMonotonousTimestamps() > .withIdleness(Duration.ofSeconds(10)) //即时没数据时,也生成watermark > .withTimestampAssigner((event, timestamp)->event.f3)); > > StreamTableEnvironment tenv = StreamTableEnvironment.create(env); > tenv.registerDataStream( > "log", > withTimestampsAndWatermarksDS, > "appid,bugid,eventid,rowtime.rowtime,proctime.proctime"); > > String sql = "select appid,eventid,cnt," + > > "(starttime + interval '8' hour ) as stime," + > > "(endtime + interval '8' hour ) as etime " > + > "from > (select appid,eventid,count(*) as cnt," + > > "TUMBLE_START(rowtime,INTERVAL '1' HOUR) as starttime," + > > "TUMBLE_END(rowtime,INTERVAL '1' HOUR) as endtime " + > "from > log group by appid,eventid,TUMBLE(rowtime,INTERVAL '1' HOUR),TIME > '00:00:00')"; //希望整点结束时触发时间窗关闭 > > Table table = tenv.sqlQuery(sql); > DataStream<Result> dataStream = tenv.toAppendStream(table, > Result.class); > > 输出的结果是: > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > (400030024,123123123123,others,1598951712000) //2020/9/1 17:15:12 > 期待的是2020-09-01 18:00:00.0结束时触发关闭窗口,结果是没有的。 > (400030024,123123123123,others,1599030999000) //2020/9/2 15:16:39 > 等到这条数据上来后才触发 > ResultHour{appid=400030024,eventid=others,cnt=4, stime=2020-09-01 > 17:00:00.0, etime=2020-09-01 18:00:00.0, SystemTime=1599031415481 > //2020/9/2 15:23:35} > 请问一下哪里出了问题?万分感谢! > > >